Connector Contribution Guide
A connector implementation is a CDI-managed bean, typically an @ApplicationScoped
bean,
which is identified by the @Connector
identifier.
In order to provide inbound and outbound channels, the connector implements two interfaces InboundConnector
and OutboundConnector
respectively.
In addition to that, the connector bean is annotated with @ConnectorAttribute
, which describes attributes to configure channels.
Maven Archetype
Smallrye Reactive Messaging provides a Maven archetype to bootstrap a new connector.
You can generate a new connector project with the code described in this guide using:
| mvn -N archetype:generate \
-DarchetypeGroupId=io.smallrye.reactive \
-DarchetypeArtifactId=smallrye-reactive-messaging-connector-archetype \
-DarchetypeVersion=4.18.1 \
-DgroupId=io.smallrye.reactive \
-Dpackage=io.smallrye.reactive.messaging.my \
-Dversion=4.18.1 \
-DartifactId=smallrye-reactive-messaging-my \
-DconnectorName=my
|
The following is an example of a connector skeleton :
| package connectors;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import connectors.api.BrokerClient;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;
@ApplicationScoped
@Connector(MyConnector.CONNECTOR_NAME)
@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "The client id ", mandatory = true)
@ConnectorAttribute(name = "buffer-size", type = "int", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", defaultValue = "128")
@ConnectorAttribute(name = "topic", type = "string", direction = OUTGOING, description = "The default topic to send the messages, defaults to channel name if not set")
@ConnectorAttribute(name = "maxPendingMessages", type = "int", direction = OUTGOING, description = "The maximum size of a queue holding pending messages", defaultValue = "1000")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = OUTGOING, description = "Whether the outgoing channel waits for the write completion", defaultValue = "true")
public class MyConnector implements InboundConnector, OutboundConnector {
public static final String CONNECTOR_NAME = "smallrye-my-connector";
@Inject
ExecutionHolder executionHolder;
Vertx vertx;
List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();
@PostConstruct
void init() {
this.vertx = executionHolder.vertx();
}
@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config);
String channelName = ic.getChannel();
String clientId = ic.getClientId();
int bufferSize = ic.getBufferSize();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client);
incomingChannels.add(channel);
return channel.getStream();
}
@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config);
String channelName = oc.getChannel();
String clientId = oc.getClientId();
int pendingMessages = oc.getMaxPendingMessages();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client);
outgoingChannels.add(channel);
return channel.getSubscriber();
}
}
|
Note that the getPublisher
and getSubscriber
methods receive MicroProfile Config Config
instance and wrap it with
MyConnectorIncomingConfiguration
and MyConnectorOutgoingConfiguration
objects.
These custom channel configuration types ease getting channel configuration, including the optional or default values.
They are generated by the smallrye-connector-attribute-processor
annotation processor, and can be configured in project pom like the following:
| <dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<generatedSourcesDirectory>${project.build.directory}/generated-sources/</generatedSourcesDirectory>
<annotationProcessors>
<annotationProcessor>
io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor
</annotationProcessor>
<annotationProcessor>
org.jboss.logging.processor.apt.LoggingToolsProcessor
</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
</plugins>
</build>
|
The smallrye-reactive-messaging-provider
is the minimum required dependency for a connector implementation.
You'll also note that the LoggingToolsProcessor
annotation processor is also configured.
This enables generating internationalized log statements and exceptions.
Typically, you would create following interfaces in i18n
sub-package: [Connector]Exceptions
, [Connector]Logging
and [Connector]Messages
.
More information can be found in JBoss Logging Tools documentation.
Implementing Inbound Channels
The InboundConnector
implementation returns, for a given channel configuration, a reactive stream of Message
s.
The returned reactive stream is an instance of Flow.Publisher
and typically can be implemented using Mutiny Multi
type.
IncomingConnectorFactory
The inbound channels can also implement the IncomingConnectorFactory
from the MicroProfile Reactive Messaging specification.
However, the PublisherBuilder
type can be more challenging to work with and
Smallrye Reactive Messaging converts the provided stream to Mutiny types to do the wiring anyway.
The returned Flow.Publisher
would allow controlling the flow of ingestion using backpressure.
It would be preferable to use pull-based APIs of the underlying messaging library to receive messages from the message broker.
You can refer to the Mutiny
"How to use polling?" guide
to construct a Multi
using Mutiny APIs, or implement the Flow.Subscription
from scratch and wrap it in an AbstractMulti
.
Here is an example channel implementation which constructs the reactive stream using the polling API:
| package connectors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.reactive.messaging.Message;
import connectors.api.BrokerClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
public class MyIncomingChannel {
private final String channel;
private final BrokerClient client;
private final Context context;
private final MyAckHandler ackHandler;
private final MyFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private Flow.Publisher<? extends Message<?>> stream;
public MyIncomingChannel(Vertx vertx, MyConnectorIncomingConfiguration cfg, BrokerClient client) {
// create and configure the client with MyConnectorIncomingConfiguration
this.channel = cfg.getChannel();
this.client = client;
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.ackHandler = MyAckHandler.create(this.client);
this.failureHandler = MyFailureHandler.create(this.client);
this.stream = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().completionStage(this.client.poll()))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler));
}
public String getChannel() {
return channel;
}
public Flow.Publisher<? extends Message<?>> getStream() {
return this.stream;
}
public void close() {
closed.compareAndSet(false, true);
client.close();
}
void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
}
}
|
Connector Threading and Vert.x
Whether the external API call is blocking or non-blocking, managing the thread on which the message processing will be dispatched can be challenging.
Smallrye Reactive Messaging depends on Eclipse Vert.x to consistently dispatch messages in event-loop or worker threads,
propagating the message context along different processing stages.
You can read more on Message Context and Vert.x Context.
Some connectors already use Vert.x clients, such as RabbitMQ, AMQP 1.0 or MQTT.
Other connectors such as Kafka or Pulsar directly use the client library of the messaging technology,
therefore they create a Vert.x Context per channel to dispatch messages on that context.
Connectors can access the Vertx
instance by injecting the ExecutionHolder
bean.
Mutiny operators runSubscribtionOn
and emitOn
can be used to switch threads the events are dispatched on.
Reactive messaging Message
type is a thin wrapper around a payload and some metadata, which lets implementing acknowledgment and negative acknowledgment of that message.
Messages received from the underlying library very often return with a custom type,
wrapping the payload and some properties such as key, timestamp, schema information, or any other metadata.
Tip
The client may propose different strategies for consuming messages individually or in batches.
If a batch consuming is available the incoming channel may receive wrap and dispatch message as a batch or individually.
While it is possible to use Message.of
builder methods to wrap the incoming message type,
a custom type implementing Message
interface helps to deal with different aspects we'll cover later,
such as deserialization, message acknowledgment or tracing.
An example message implementation would be like the following:
| package connectors;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import connectors.api.ConsumedMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
public class MyMessage<T> implements Message<T>, ContextAwareMessage<T> {
private final T payload;
private final Metadata metadata;
private final MyAckHandler ackHandler;
private final MyFailureHandler nackHandler;
private ConsumedMessage<?> consumed;
public MyMessage(ConsumedMessage<T> message, MyAckHandler ackHandler, MyFailureHandler nackHandler) {
this.consumed = message;
this.payload = message.body();
this.ackHandler = ackHandler;
this.nackHandler = nackHandler;
this.metadata = ContextAwareMessage.captureContextMetadata(new MyIncomingMetadata<>(message));
}
public ConsumedMessage<?> getConsumedMessage() {
return consumed;
}
@Override
public T getPayload() {
return payload;
}
@Override
public Metadata getMetadata() {
return metadata;
}
@Override
public CompletionStage<Void> ack() {
return ackHandler.handle(this);
}
@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
return nackHandler.handle(this, reason, nackMetadata);
}
}
|
Note that MyMessage
implements the ContextAwareMessage
.
In the constructor captureContextMetadata
helper method is used to capture the Vert.x context which created the object and capturing it into the LocalContextMetadata
.
This metadata allows running each message in its own Vert.x context, supporting context propagation.
The MyMessage
type implements the accessors for the metadata
and payload
from the Message
interface.
If the messaging technology doesn't have a built-in unmarshalling mechanism, the message can deserialize the raw payload to a primitive or a complex object.
Warning
The custom message implementation is usually not the type consumed by the application injecting channels.
Applications usually inject in the payload, the raw consumed type (in the above example the ConsumedMessage
),
or some other type provided by the MessageConverter
s.
Handling of Message
types by the application is restricted only to advanced use cases, because handling of message acknowledgment is manual
Even then the message may be intercepted before and changed, conserving the metadata
, ack
and nack
handlers but not the original type created by the connector.
The MyIncomingMetadata
gives access to the underlying consumed message attributes, and applications can inject this object for accessing message details:
| package connectors;
import java.util.Map;
import connectors.api.ConsumedMessage;
public class MyIncomingMetadata<T> {
private final ConsumedMessage<T> msg;
public MyIncomingMetadata(ConsumedMessage<T> msg) {
this.msg = msg;
}
public ConsumedMessage<T> getCustomMessage() {
return msg;
}
public T getBody() {
return msg.body();
}
public String getKey() {
return msg.key();
}
public long getTimestamp() {
return msg.timestamp();
}
public Map<String, String> getProperties() {
return msg.properties();
}
}
|
Also note that ack
and nack
method implementations are delegated to handler objects.
This allows configuring different strategies at channel level.
Acknowledgment strategies
The acknowledgement is the way for message consumers to inform the broker that the message has been successfully received and processed.
Depending on the messaging technology the broker then can decide to remove the message from the queue, flag as consumed or purge it completely.
In Reactive Messaging there are different policies to trigger the acknowledgement but the canonical one is to acknowledge a message when the processing (potentially asynchronous) has completed (POST_PROCESSING
).
The Reactive Messaging defines Message#ack
method as non-blocking asynchronous, returning a CompletionStage<Void>
,
because potentially the acknowledgement action sends a network call to the broker.
The following example simply calls the client ack
method using the Mutiny Uni
and switch the emitter to the Message context.
Returning back to the message context is essential for chaining asynchronous actions without losing the context and for keeping the consistency on message consumption flow.
| package connectors;
import java.util.concurrent.CompletionStage;
import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;
public class MyAckHandler {
private final BrokerClient client;
static MyAckHandler create(BrokerClient client) {
return new MyAckHandler(client);
}
public MyAckHandler(BrokerClient client) {
this.client = client;
}
public CompletionStage<Void> handle(MyMessage<?> msg) {
return Uni.createFrom().completionStage(client.ack(msg.getConsumedMessage()))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
|
While this ack handler strategy acknowledges each message to the broker,
the messaging technology can allow employing different strategies for acknowledging messages.
For example an ack strategy can track processed messages and acknowledge them altogether or call a different client side endpoint to acknowledge the message batch.
Failure handling strategies
The failure handling, or the negative acknowledgment allows indicating that a message was not processed correctly.
Similar to the acknowledgment the Reactive Messaging defines Message#nack(Throwable reason, Metadata metadata)
method as non-blocking asynchronous, returning a CompletionStage<Void>
.
| package connectors;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;
public class MyFailureHandler {
private final BrokerClient client;
static MyFailureHandler create(BrokerClient client) {
return new MyFailureHandler(client);
}
public MyFailureHandler(BrokerClient client) {
this.client = client;
}
public CompletionStage<Void> handle(MyMessage<?> msg, Throwable reason, Metadata metadata) {
return Uni.createFrom().completionStage(() -> client.reject(msg.getConsumedMessage(), reason.getMessage()))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
|
Different failure handling strategies can, for example,
- Ignore the failure, log and call the ack
instead
- Send the message to a dead letter queue and call the ack
- Employ a different strategy depending on the Metadata
associated with the nack
method call.
Message Converters
The connector can propose default MessageConverter
implementations for converting the payload to a custom type.
As an example the following converter extracts the CustomMessage
and puts it in the payload:
| package connectors;
import java.lang.reflect.Type;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;
import connectors.api.ConsumedMessage;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;
@ApplicationScoped
public class MyMessageConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
return TypeUtils.isAssignable(target, ConsumedMessage.class)
&& in.getMetadata(MyIncomingMetadata.class).isPresent();
}
@Override
public Message<?> convert(Message<?> in, Type target) {
return in.withPayload(in.getMetadata(MyIncomingMetadata.class)
.map(MyIncomingMetadata::getCustomMessage)
.orElse(null));
}
}
|
Implementing Outbound Channels
The OutboundConnector
implementation returns, for a given channel configuration, a Flow.Subscriber
of messages.
This is typically implemented by a custom Flow.Processor
and using the MultiUtils.via
helper methods to apply message transformations.
OutgoingConnectorFactory
The outbound channels can also implement the OutgoingConnectorFactory
from the MicroProfile Reactive Messaging specification.
However, it is usually more friendly to work with the MultiUtils.via
methods to construct and transform outgoing messages.
Here is an example outgoing channel implementation:
| package connectors;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.reactive.messaging.Message;
import connectors.api.BrokerClient;
import connectors.api.SendMessage;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.mutiny.core.Vertx;
public class MyOutgoingChannel {
private final String channel;
private Flow.Subscriber<? extends Message<?>> subscriber;
private final BrokerClient client;
private final String topic;
public MyOutgoingChannel(Vertx vertx, MyConnectorOutgoingConfiguration oc, BrokerClient client) {
this.channel = oc.getChannel();
this.client = client;
this.topic = oc.getTopic().orElse(oc.getChannel());
this.subscriber = MultiUtils.via(multi -> multi.call(m -> publishMessage(this.client, m)));
}
private Uni<Void> publishMessage(BrokerClient client, Message<?> m) {
// construct the outgoing message
SendMessage sendMessage = new SendMessage();
Object payload = m.getPayload();
sendMessage.setPayload(payload);
sendMessage.setTopic(topic);
m.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
sendMessage.setTopic(out.getTopic());
sendMessage.setKey(out.getKey());
//...
});
return Uni.createFrom().completionStage(() -> client.send(sendMessage))
.onItem().transformToUni(receipt -> Uni.createFrom().completionStage(m.ack()))
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t)));
}
public Flow.Subscriber<? extends Message<?>> getSubscriber() {
return this.subscriber;
}
public String getChannel() {
return this.channel;
}
}
|
The MultiUtils.via
helper method allows using the Multi
chaining methods and in the same time provides a Flow.Subscriber
.
However, this implementation allows sending messages one at a time:
one only after the previous outgoing message send is completed.
Some messaging technologies provide publish receipt,
a message back from the broker to the sender that asynchronously acknowledges the sent operation.
In this case the connector can only be sure of the send operation when it receives the publish receipt of that message.
Some technologies may provide blocking sending calls,
in that case the connector needs to delegate the sending call to a worker thread.
Depending on whether the client supports multiple in-flight outgoing messages, you can also use a SenderProcessor
,
which allows receiving configuration for the maximum number of in-flight messages and whether it waits for completion (publish receipt from the broker):
| long requests = oc.getMaxPendingMessages();
boolean waitForWriteCompletion = oc.getWaitForWriteCompletion();
if (requests <= 0) {
requests = Long.MAX_VALUE;
}
this.processor = new SenderProcessor(requests, waitForWriteCompletion, m -> publishMessage(client, m));
this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> {
// log the failure
}));
|
Other more advanced scenarios can be implemented to retry the transmission in case of a retryable failure,
or batch multiple outgoing messages to a single send operation.
Outgoing Message
Builder
In order to convey all attributes of the outgoing message to the client library
connectors provide outgoing Message
implementation and a corresponding outgoing message metadata.
These allow the application developer to build the message attributes that will be sent to the broker.
| package connectors;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
public class MyOutgoingMessage<T> implements Message<T>, ContextAwareMessage<T> {
private final T payload;
private final Metadata metadata;
private final Supplier<CompletionStage<Void>> ack;
private final Function<Throwable, CompletionStage<Void>> nack;
public static <T> MyOutgoingMessage<T> from(Message<T> message) {
return new MyOutgoingMessage<>(message.getPayload(), message.getMetadata(), message.getAck(), message.getNack());
}
public MyOutgoingMessage(T payload, Metadata metadata,
Supplier<CompletionStage<Void>> ack,
Function<Throwable, CompletionStage<Void>> nack) {
this.payload = payload;
this.metadata = metadata;
this.ack = ack;
this.nack = nack;
}
public MyOutgoingMessage(T payload, String key, String topic,
Supplier<CompletionStage<Void>> ack,
Function<Throwable, CompletionStage<Void>> nack) {
this(payload, Metadata.of(new MyOutgoingMetadata(topic, key)), ack, nack);
}
@Override
public T getPayload() {
return payload;
}
@Override
public Metadata getMetadata() {
return metadata;
}
@Override
public Supplier<CompletionStage<Void>> getAck() {
return this.ack;
}
@Override
public Function<Throwable, CompletionStage<Void>> getNack() {
return this.nack;
}
public MyOutgoingMessage<T> withKey(String key) {
this.metadata.with(this.metadata.get(MyOutgoingMetadata.class)
.map(m -> MyOutgoingMetadata.builder(m).withKey(key).build()));
return this;
}
public MyOutgoingMessage<T> withTopic(String topic) {
this.metadata.with(this.metadata.get(MyOutgoingMetadata.class)
.map(m -> MyOutgoingMetadata.builder(m).withTopic(topic).build()));
return this;
}
}
|
| package connectors;
public class MyOutgoingMetadata {
private String topic;
private String key;
public static MyOutgoingMetadataBuilder builder() {
return new MyOutgoingMetadataBuilder();
}
public static MyOutgoingMetadataBuilder builder(MyOutgoingMetadata metadata) {
return new MyOutgoingMetadataBuilder(metadata);
}
public MyOutgoingMetadata(String topic, String key) {
this.topic = topic;
this.key = key;
}
public String getTopic() {
return topic;
}
public String getKey() {
return key;
}
public static class MyOutgoingMetadataBuilder {
private String topic;
private String key;
public MyOutgoingMetadataBuilder() {
}
public MyOutgoingMetadataBuilder(MyOutgoingMetadata metadata) {
this.key = metadata.getKey();
this.topic = metadata.getTopic();
}
public MyOutgoingMetadataBuilder withTopic(String topic) {
this.topic = topic;
return this;
}
public MyOutgoingMetadataBuilder withKey(String key) {
this.key = key;
return this;
}
public MyOutgoingMetadata build() {
return new MyOutgoingMetadata(topic, key);
}
}
}
|
The outgoing channel implementation then will construct the client library object that represents the outbound message, SendMessage
in this example:
| private Uni<Void> publishMessage(BrokerClient client, Message<?> message) {
// construct the outgoing message
SendMessage sendMessage;
Object payload = message.getPayload();
if (payload instanceof SendMessage) {
sendMessage = (SendMessage) message.getPayload();
} else {
sendMessage = new SendMessage();
sendMessage.setPayload(payload);
sendMessage.setTopic(topic);
message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
sendMessage.setTopic(out.getTopic());
sendMessage.setKey(out.getKey());
//...
});
}
return Uni.createFrom().completionStage(() -> client.send(sendMessage))
.onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack()))
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t)));
}
|
It is a best practice to also allow the application to return a payload of the client outbound library object (SendMessage
).
Outgoing message acknowledgement
Because the Reactive Messaging chains acknowledgements from incoming message until the outgoing message,
it is crucial for the outgoing channel to correctly ack and nack the message.
Smallrye Health Integration
Smallrye Reactive Messaging allows connectors to integrate with Smallrye Health to contribute channel state to the health reports.
Connectors need to implement the HealthReporter
interface and implement some or all of the getReadiness
, getLiveness
and getStartup
methods:
| @ApplicationScoped
@Connector(MyConnectorWithPartials.CONNECTOR_NAME)
public class MyConnectorWithPartials implements InboundConnector, OutboundConnector, HealthReporter {
public static final String CONNECTOR_NAME = "smallrye-my-connector";
List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();
@Override
public HealthReport getReadiness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}
@Override
public HealthReport getLiveness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}
@Override
public HealthReport getStartup() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}
|
Implementing health reports per channel depends on what information is available to the connector.
For more information on different health check probes you can check out Configure Liveness, Readiness and Startup Probes
You may want to add a connector attribute to enable/disable the health reporting per channel:
| import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
|
OpenTelemetry Tracing Integration
Smallrye Reactive Messaging allows connectors to easily integrate with OpenTelemetry tracing.
It propagates the tracing context from inbound messages and to outbound messages.
The smallrye-reactive-messaging-otel
module provides necessary dependencies to the OpenTelemetry artifacts and also provides TracingUtils
helper class for setting up the tracing.
| <dependency>
<groupId>io.smallrye.reactive.messaging</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
</dependency>
|
For integrating tracing you'd need to create a couple of classes:
- Holder class for tracing information
- Implementation of
io.opentelemetry.context.propagation.TextMapGetter
, which retrieves for a given key the value of a tracing attributed from the holder object
- Implementation of
io.opentelemetry.context.propagation.TextMapSetter
, which sets on the holder object the key and value of tracing attributes
- Implementations of
io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
and io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter
Then you'd need to configure instrumenters per incoming and outgoing channel:
| public static Instrumenter<MyTrace, Void> createInstrumenter(boolean incoming) {
MessageOperation messageOperation = incoming ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;
MyAttributesExtractor myExtractor = new MyAttributesExtractor();
MessagingAttributesGetter<MyTrace, Void> attributesGetter = myExtractor.getMessagingAttributesGetter();
var spanNameExtractor = MessagingSpanNameExtractor.create(attributesGetter, messageOperation);
InstrumenterBuilder<MyTrace, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
"io.smallrye.reactive.messaging", spanNameExtractor);
var attributesExtractor = MessagingAttributesExtractor.create(attributesGetter, messageOperation);
builder
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(myExtractor);
if (incoming) {
return builder.buildConsumerInstrumenter(MyTraceTextMapGetter.INSTANCE);
} else {
return builder.buildProducerInstrumenter(MyTraceTextMapSetter.INSTANCE);
}
}
// </create-instrumener>
public Message<?> traceIncoming(Message<?> message, MyTrace myTrace, boolean makeCurrent) {
return TracingUtils.traceIncoming(instrumenter, message, myTrace, makeCurrent);
}
public void traceOutgoing(Message<?> message, MyTrace myTrace) {
TracingUtils.traceOutgoing(instrumenter, message, myTrace);
}
}
|
Finally, you'd need to configure instrumenters per incoming and outgoing channels and wire the call to instrumenter using TracingUtils
.
For an incoming channel, you'd need to call the instrumenter on an inbound message:
| Multi<? extends Message<?>> receiveMulti = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().completionStage(this.client.poll()))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler));
Instrumenter<MyTrace, Void> instrumenter = MyOpenTelemetryInstrumenter.createInstrumenter(true);
if (tracingEnabled) {
receiveMulti = receiveMulti.map(message -> {
ConsumedMessage<?> consumedMessage = message.getMetadata(MyIncomingMetadata.class).get().getCustomMessage();
return TracingUtils.traceIncoming(instrumenter, message, new MyTrace.Builder()
.withClientId(consumedMessage.clientId())
.withTopic(consumedMessage.topic())
.withProperties(consumedMessage.properties())
.build());
});
}
|
For an outgoing channel, you'd need to call the instrumenter on constructing the outbound message:
| private Uni<Void> publishMessageWithTracing(BrokerClient client, Message<?> message) {
// construct the outgoing message
SendMessage sendMessage;
Object payload = message.getPayload();
if (payload instanceof SendMessage) {
sendMessage = (SendMessage) message.getPayload();
} else {
sendMessage = new SendMessage();
sendMessage.setPayload(payload);
sendMessage.setTopic(topic);
message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
sendMessage.setTopic(out.getTopic());
sendMessage.setKey(out.getKey());
//...
});
}
if (tracingEnabled) {
Map<String, String> properties = new HashMap<>();
TracingUtils.traceOutgoing(instrumenter, message, new MyTrace.Builder()
.withProperties(properties)
.withTopic(sendMessage.getTopic())
.build());
sendMessage.setProperties(properties);
}
return Uni.createFrom().completionStage(() -> client.send(sendMessage))
.onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack()))
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t)));
}
|
You may want to add a connector attribute to enable/disable the tracing per channel:
| import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")
|
Testing the connector
While unit tests are highly encouraged for validating ad-hoc logic in connector code,
by nature connector tests are mostly integration tests validating the correct configuration and functioning of channels.
Most of the time tests need to run against a broker instance.
This instance can be mocked or embedded in the test JVM,
or provisioned in a container runtime using Testcontainers.
The testcontainers approach is encouraged as it'll provide a testing environment closest to reality.
It may take too much time and resources to start a broker per test method or per test class,
so may want to share the same broker instance between all test classes.
In that case you can checkout how to write a JUnit 5 Extension
and start only one container instance in the beginning of tests and stop it at the end of all the tests.
There are essentially two ways of creating the connector behavior to test against:
- Instantiating channels directly by passing the custom configuration.
With this you can get the Reactive stream directly from the channel implementation and send/receive messages.
You can use AssertSubscriber from Mutiny to regulate demand and write assertions.
- CDI-based tests which write configuration and instantiate application beans.
You can use Weld, the reference implementation of CDI specification with configured set of beans and extensions:
| package connectors.test;
import jakarta.enterprise.inject.spi.BeanManager;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import connectors.MyConnector;
import connectors.MyMessageConverter;
import io.smallrye.config.SmallRyeConfigProviderResolver;
import io.smallrye.config.inject.ConfigExtension;
import io.smallrye.reactive.messaging.providers.MediatorFactory;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.extension.ChannelProducer;
import io.smallrye.reactive.messaging.providers.extension.EmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.HealthCenter;
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
public class WeldTestBase {
protected Weld weld;
protected WeldContainer container;
@BeforeEach
public void initWeld() {
weld = new Weld();
// SmallRye config
ConfigExtension extension = new ConfigExtension();
weld.addExtension(extension);
weld.addBeanClass(MediatorFactory.class);
weld.addBeanClass(MediatorManager.class);
weld.addBeanClass(InternalChannelRegistry.class);
weld.addBeanClass(ConnectorFactories.class);
weld.addBeanClass(ConfiguredChannelFactory.class);
weld.addBeanClass(ChannelProducer.class);
weld.addBeanClass(ExecutionHolder.class);
weld.addBeanClass(WorkerPoolRegistry.class);
weld.addBeanClass(HealthCenter.class);
weld.addBeanClass(Wiring.class);
weld.addExtension(new ReactiveMessagingExtension());
weld.addBeanClass(EmitterFactoryImpl.class);
weld.addBeanClass(MutinyEmitterFactoryImpl.class);
weld.addBeanClass(LegacyEmitterFactoryImpl.class);
weld.addBeanClass(MyConnector.class);
weld.addBeanClass(MyMessageConverter.class);
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.disableDiscovery();
}
@AfterEach
public void stopContainer() {
if (container != null) {
// TODO Explicitly close the connector
getBeanManager().createInstance()
.select(MyConnector.class, ConnectorLiteral.of(MyConnector.CONNECTOR_NAME)).get();
container.close();
}
// Release the config objects
SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig());
}
public BeanManager getBeanManager() {
if (container == null) {
runApplication(new MapBasedConfig());
}
return container.getBeanManager();
}
public void addBeans(Class<?>... clazzes) {
weld.addBeanClasses(clazzes);
}
public <T> T get(Class<T> clazz) {
return getBeanManager().createInstance().select(clazz).get();
}
public <T> T runApplication(MapBasedConfig config, Class<T> clazz) {
weld.addBeanClass(clazz);
runApplication(config);
return get(clazz);
}
public void runApplication(MapBasedConfig config) {
if (config != null) {
config.write();
} else {
MapBasedConfig.cleanup();
}
container = weld.initialize();
}
public static void addConfig(MapBasedConfig config) {
if (config != null) {
config.write();
} else {
MapBasedConfig.cleanup();
}
}
public HealthCenter getHealth() {
if (container == null) {
throw new IllegalStateException("Application not started");
}
return container.getBeanManager().createInstance().select(HealthCenter.class).get();
}
public boolean isStarted() {
return getHealth().getStartup().isOk();
}
public boolean isReady() {
return getHealth().getReadiness().isOk();
}
public boolean isAlive() {
return getHealth().getLiveness().isOk();
}
}
|
You would need following test dependencies for enabling Weld in tests:
| <dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>test-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-shaded</artifactId>
<version>${weld.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld</groupId>
<artifactId>weld-core-impl</artifactId>
<version>${weld.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
|
Your test classes can therefore extend the WeldTestBase
and provide configuration and application beans:
| package connectors.test;
import static org.awaitility.Awaitility.await;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;
import connectors.MyConnector;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
public class MyConnectorTest extends WeldTestBase {
@Test
void incomingChannel() {
String host = "";
int port = 0;
String myTopic = UUID.randomUUID().toString();
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.data.topic", myTopic)
.with("mp.messaging.incoming.data.host", host)
.with("mp.messaging.incoming.data.port", port)
.with("mp.messaging.incoming.data.connector", MyConnector.CONNECTOR_NAME);
MyApp app = runApplication(config, MyApp.class);
int expected = 10;
// produce expected number of messages to myTopic
// wait until app received
await().until(() -> app.received().size() == expected);
}
@ApplicationScoped
public static class MyApp {
List<String> received = new CopyOnWriteArrayList<>();
@Incoming("data")
void consume(String msg) {
received.add(msg);
}
public List<String> received() {
return received;
}
}
}
|
Awaitility
Because connector tests are usually asynchronous, awaitility
provides a DSL to await on expressed assertions.
Common tests for validating the connector
- Message consumption through incoming channels
- Message producing through outgoing channels
- Ack and failure handler strategies test
- Message Context propagation test
LocalPropagationTest
HealthCheckTest
MessageConverterTest
TracingPropagationTest
- Configuration test
- Authentication test
- Tests for