Reactive Messaging can handle messages generated from within the application but also interact with remote brokers. Reactive Messaging Connectors interacts with these remote brokers to retrieve messages and send messages using various protocols and technology.
Each connector handles to a specific technology. For example, a Kafka Connector is responsible for interacting with Kafka, while an MQTT Connector is responsible for MQTT interactions.
Each connector has a name. This name is referenced by the application to indicate that this connector manages a specific channel.
For example, the SmallRye Kafka Connector is named:
retrieve messages from a remote broker (inbound)
send messages to a remove broker (outbound)
A connector can, of course, implement both directions.
Inbound connectors are responsible for:
Getting messages from the remote broker,
Creating a Reactive Messaging
Messageassociated with the retrieved message.
Potentially associating technical metadata with the message. It includes unmarshalling the payload.
Associating an acknowledgment callback to acknowledge the incoming message when the Reactive Messaging message is processed/acknowledged.
The first step should follow the reactive streams principle: uses non-blocking technology, respects downstream requests.
Outbound connectors are responsible for:
Receiving Reactive Messaging
Messageand transform it into a structure understood by the remote broker. It includes marshaling the payload.
Messagecontains outbound metadata (metadata set during the processing to influence the outbound structure and routing), taking them into account.
Sending the message to the remote broker.
Acknowledging the Reactive Messaging
Messagewhen the broker has accepted/acknowledged the message.
Applications need to configure the connector used by expressing which channel is managed by which connector. Non-mapped channels are local / in-memory.
To configure connectors, you need to have an implementation of MicroProfile Config. If you don’t have one, add an implementation of MicroProfile Config in your classpath, such as:
<dependency> <groupId>io.smallrye.config</groupId> <artifactId>smallrye-config</artifactId> <version>2.3.0</version> </dependency>
Then edit the application configuration, generally
The application configures the connector with a set of properties structured as follows:
mp.messaging.incoming.dummy-incoming-channel.connector=dummy mp.messaging.incoming.dummy-incoming-channel.attribute=value mp.messaging.outgoing.dummy-outgoing-channel.connector=dummy mp.messaging.outgoing.dummy-outgoing-channel.attribute=value
You configure each channel (both incoming and outgoing) individually.
[incoming|outgoing] segment indicates the direction.
incomingchannel consumes data from a message broker or something producing data. It’s an inbound interaction. It relates to methods annotated with an
@Incomingusing the same channel name.
outgoingconsumes data from the application and forwards it to a message broker or something consuming data. It’s an outbound interaction. It relates to methods annotated with an
@Outgoingusing the same channel name.
[channel-name] is the name of the channel.
If the channel name contains a
. (dot), you would need to use
" (double-quote) around it.
For example, to configure the
dummy.incoming.channel channel, you would need:
[attribute]=[value] sets a specific connector attribute to the given value.
Attributes depend on the used connector.
So, refer to the connector documentation to check the supported attributes.
connector attribute must be set for each mapped channel and indicates the name of the connector responsible for the channel.
Here is an example of a channel using an MQTT connector, consuming data from a MQTT broker, and a channel using a Kafka connector (writing data to Kafka):
# [Channel - health] - Consume data from MQTT mp.messaging.incoming.health.topic=neo mp.messaging.incoming.health.connector=smallrye-mqtt mp.messaging.incoming.health.host=localhost mp.messaging.incoming.health.broadcast=true # [/Channel - health] # [Channel - data] - Produce data to Kafka mp.messaging.outgoing.data.connector=smallrye-kafka mp.messaging.outgoing.data.bootstrap.servers=localhost:9092 mp.messaging.outgoing.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.data.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer mp.messaging.outgoing.data.acks=1 # [/Channel - data]
To use a connector, you need to add it to your CLASSPATH.
Generally, adding the dependency to your project is enough.
Then, you need to know the connector’s name and set the
In the connector documentation, you will find a table listing the attribute supported by the connector. Be aware that attributes for inbound and outbound interactions may be different.
These tables contain the following entries:
The name of the attribute, and potentially an alias. The name of the attribute is used with the
attributesegment). The alias (if set) is the name of a global MicroProfile Config property that avoids having to configure the attribute for each managed channel. For example, to set the location of your Kafka broker globally, you can use the
The description of the attribute, including the type.
Whether or not that attribute is mandatory. If so, it fails the deployment if not set
The default value, if any.