Reactive Messaging can handle messages generated from within the application but also interact with remote brokers. Reactive Messaging Connectors interacts with these remote brokers to retrieve messages and send messages using various protocols and technology.
Each connector is dedicated to a specific technology. For example, a Kafka Connector is responsible to interact with Kafka, while a MQTT Connector is responsible for MQTT interactions.
Each connector has a name. This name is referenced by the application to indicate that a specific channel is managed by this connector.
For example, the SmallRye Kafka Connector is named:
retrieve messages from a remote broker (inbound)
send messages to a remove broker (outbound)
A connector can, of course, implement both directions.
Inbound connectors are responsible for:
Getting messages from the remote broker,
Creating a Reactive Messaging
Messageassociated with the retrieved message.
Potentially associating technical metadata with the message. This includes unmarshalling the payload.
Associating a acknowledgement callback to acknowledge the incoming message when the Reactive Messaging message is acknowledged.
The first step should follow the reactive streams principle: uses non-blocking technology, respects downstream requests.
Outbound connectors are responsible for:
Receiving Reactive Messaging
Messageand transform it into a structure understand by the remote broker. This includes marshalling the payload.
Messagecontains outbound metadata (metadata set during the processing to influence the outbound structure and routing), taking them into account.
Sending the message to the remote broker.
Acknowledging the Reactive Messaging
Messagewhen the broker has accepted / acknowledged the message.
Applications needs to configure the connector used by expressing which channel is managed by which connector. Non-mapped channels are local / in-memory.
To configure connectors, you need to have an implementation of MicroProfile Config. If you don’t have one, add an implementation of MicroProfile Config in your classpath, such as:
<dependency> <groupId>io.smallrye.config</groupId> <artifactId>smallrye-config</artifactId> <version>1.8.0</version> </dependency>
Then edit the application configuration, generally
The application configures the connector with a set of properties structured as follows:
mp.messaging.incoming.dummy-incoming-channel.connector=dummy mp.messaging.incoming.dummy-incoming-channel.attribute=value mp.messaging.outgoing.dummy-outgoing-channel.connector=dummy mp.messaging.outgoing.dummy-outgoing-channel.attribute=value
Each channel (both incoming and outgoing) are configured individually.
[incoming|outgoing] segment indicates the direction.
incomingchannel consumes data from a message broker or something producing data. It’s an inbound interaction. It can be connected to a method annotated with an
@Incomingusing the same channel name.
outgoingconsumes data from the application and forward it to a message broker or something consuming data. It’s an outbound interaction. It can be connected to a method annotated with an
@Outgoingusing the same channel name.
[channel-name] is the name of the channel.
[attribute]=[value] sets a specific connector attribute to the given value.
Attributes depends on the sued connector.
So, refer to the connector documentation to check the supported attributes.
connector attribute must be set for each mapped channel, and indicates the name of the connector responsible for the channel.
Here is an example of a channel using an MQTT connector, consuming data from a MQTT broker, and a channel using a Kafka connector (writing data to Kafka):
# [Channel - health] - Consume data from MQTT mp.messaging.incoming.health.topic=neo mp.messaging.incoming.health.connector=smallrye-mqtt mp.messaging.incoming.health.host=localhost mp.messaging.incoming.health.broadcast=true # [/Channel - health] # [Channel - data] - Produce data to Kafka mp.messaging.outgoing.data.connector=smallrye-kafka mp.messaging.outgoing.data.bootstrap.servers=localhost:9092 mp.messaging.outgoing.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.data.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer mp.messaging.outgoing.data.acks=1 # [/Channel - data]
To use a connector you need to add it to your CLASSPATH.
Generally, adding the dependency to your project is enough.
Then, you need to know the name of the connector and set the
In the connector documentation you will find a table listing the attribute supported by the connector. Be aware that attributes for inbound and outbound interactions may be different.
These tables contains the following entries:
The name of the attribute, and potentially an alias. The name of the attribute is used with the
attributesegment). The alias (if set) is the name of a global MicroProfile Config property that avoids having to configure the attribute for each managed channel. For example, to set the location of your Kafka broker globally, you can use the
The description of the attribute, including the type.
Whether of not the attribute is mandatory. If so, omitting it would fails the deployment.
The default value if any.