Skip to content

Connector Contribution Guide

A connector implementation is a CDI-managed bean, typically an @ApplicationScoped bean, which is identified by the @Connector identifier. In order to provide inbound and outbound channels, the connector implements two interfaces InboundConnector and OutboundConnector respectively. In addition to that, the connector bean is annotated with @ConnectorAttribute, which describes attributes to configure channels.

Maven Archetype

Smallrye Reactive Messaging provides a Maven archetype to bootstrap a new connector. You can generate a new connector project with the code described in this guide using:

1
2
3
4
5
6
7
8
9
mvn -N archetype:generate \
-DarchetypeGroupId=io.smallrye.reactive \
-DarchetypeArtifactId=smallrye-reactive-messaging-connector-archetype \
-DarchetypeVersion=4.15.0 \
-DgroupId=io.smallrye.reactive \
-Dpackage=io.smallrye.reactive.messaging.my \
-Dversion=4.15.0 \
-DartifactId=smallrye-reactive-messaging-my \
-DconnectorName=my

The following is an example of a connector skeleton :

package connectors;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import connectors.api.BrokerClient;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;

@ApplicationScoped
@Connector(MyConnector.CONNECTOR_NAME)
@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "The client id ", mandatory = true)
@ConnectorAttribute(name = "buffer-size", type = "int", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", defaultValue = "128")
@ConnectorAttribute(name = "topic", type = "string", direction = OUTGOING, description = "The default topic to send the messages, defaults to channel name if not set")
@ConnectorAttribute(name = "maxPendingMessages", type = "int", direction = OUTGOING, description = "The maximum size of a queue holding pending messages", defaultValue = "1000")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = OUTGOING, description = "Whether the outgoing channel waits for the write completion", defaultValue = "true")
public class MyConnector implements InboundConnector, OutboundConnector {

    public static final String CONNECTOR_NAME = "smallrye-my-connector";

    @Inject
    ExecutionHolder executionHolder;

    Vertx vertx;

    List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
    List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

    @PostConstruct
    void init() {
        this.vertx = executionHolder.vertx();
    }

    @Override
    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config);
        String channelName = ic.getChannel();
        String clientId = ic.getClientId();
        int bufferSize = ic.getBufferSize();
        // ...
        BrokerClient client = BrokerClient.create(clientId);
        MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client);
        incomingChannels.add(channel);
        return channel.getStream();
    }

    @Override
    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config);
        String channelName = oc.getChannel();
        String clientId = oc.getClientId();
        int pendingMessages = oc.getMaxPendingMessages();
        // ...
        BrokerClient client = BrokerClient.create(clientId);
        MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client);
        outgoingChannels.add(channel);
        return channel.getSubscriber();
    }
}

Note that the getPublisher and getSubscriber methods receive MicroProfile Config Config instance and wrap it with MyConnectorIncomingConfiguration and MyConnectorOutgoingConfiguration objects.

These custom channel configuration types ease getting channel configuration, including the optional or default values. They are generated by the smallrye-connector-attribute-processor annotation processor, and can be configured in project pom like the following:

    <dependencies>
      <dependency>
        <groupId>${project.groupId}</groupId>
        <artifactId>smallrye-reactive-messaging-provider</artifactId>
        <version>${project.version}</version>
      </dependency>
      <dependency>
        <groupId>io.smallrye.reactive</groupId>
        <artifactId>smallrye-connector-attribute-processor</artifactId>
        <version>${project.version}</version>
      </dependency>
    </dependencies>
    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <configuration>
            <generatedSourcesDirectory>${project.build.directory}/generated-sources/</generatedSourcesDirectory>
            <annotationProcessors>
              <annotationProcessor>
                io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor
              </annotationProcessor>
              <annotationProcessor>
                org.jboss.logging.processor.apt.LoggingToolsProcessor
              </annotationProcessor>
            </annotationProcessors>
          </configuration>
        </plugin>
      </plugins>
    </build>

The smallrye-reactive-messaging-provider is the minimum required dependency for a connector implementation. You'll also note that the LoggingToolsProcessor annotation processor is also configured. This enables generating internationalized log statements and exceptions. Typically, you would create following interfaces in i18n sub-package: [Connector]Exceptions, [Connector]Logging and [Connector]Messages. More information can be found in JBoss Logging Tools documentation.

Implementing Inbound Channels

The InboundConnector implementation returns, for a given channel configuration, a reactive stream of Messages. The returned reactive stream is an instance of Flow.Publisher and typically can be implemented using Mutiny Multi type.

IncomingConnectorFactory

The inbound channels can also implement the IncomingConnectorFactory from the MicroProfile Reactive Messaging specification. However, the PublisherBuilder type can be more challenging to work with and Smallrye Reactive Messaging converts the provided stream to Mutiny types to do the wiring anyway.

The returned Flow.Publisher would allow controlling the flow of ingestion using backpressure. It would be preferable to use pull-based APIs of the underlying messaging library to receive messages from the message broker. You can refer to the Mutiny "How to use polling?" guide to construct a Multi using Mutiny APIs, or implement the Flow.Subscription from scratch and wrap it in an AbstractMulti.

Here is an example channel implementation which constructs the reactive stream using the polling API:

package connectors;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.microprofile.reactive.messaging.Message;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

public class MyIncomingChannel {

    private final String channel;
    private final BrokerClient client;
    private final Context context;
    private final MyAckHandler ackHandler;
    private final MyFailureHandler failureHandler;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private Flow.Publisher<? extends Message<?>> stream;

    public MyIncomingChannel(Vertx vertx, MyConnectorIncomingConfiguration cfg, BrokerClient client) {
        // create and configure the client with MyConnectorIncomingConfiguration
        this.channel = cfg.getChannel();
        this.client = client;
        this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
        this.ackHandler = MyAckHandler.create(this.client);
        this.failureHandler = MyFailureHandler.create(this.client);
        this.stream = Multi.createBy().repeating()
                .uni(() -> Uni.createFrom().completionStage(this.client.poll()))
                .until(__ -> closed.get())
                .emitOn(context::runOnContext)
                .map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler));
    }

    public String getChannel() {
        return channel;
    }

    public Flow.Publisher<? extends Message<?>> getStream() {
        return this.stream;
    }

    public void close() {
        closed.compareAndSet(false, true);
        client.close();
    }

    void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {

    }

}

Connector Threading and Vert.x

Whether the external API call is blocking or non-blocking, managing the thread on which the message processing will be dispatched can be challenging. Smallrye Reactive Messaging depends on Eclipse Vert.x to consistently dispatch messages in event-loop or worker threads, propagating the message context along different processing stages. You can read more on Message Context and Vert.x Context.

Some connectors already use Vert.x clients, such as RabbitMQ, AMQP 1.0 or MQTT. Other connectors such as Kafka or Pulsar directly use the client library of the messaging technology, therefore they create a Vert.x Context per channel to dispatch messages on that context. Connectors can access the Vertx instance by injecting the ExecutionHolder bean. Mutiny operators runSubscribtionOn and emitOn can be used to switch threads the events are dispatched on.

Custom Message and Metadata implementations

Reactive messaging Message type is a thin wrapper around a payload and some metadata, which lets implementing acknowledgment and negative acknowledgment of that message.

Messages received from the underlying library very often return with a custom type, wrapping the payload and some properties such as key, timestamp, schema information, or any other metadata.

Tip

The client may propose different strategies for consuming messages individually or in batches. If a batch consuming is available the incoming channel may receive wrap and dispatch message as a batch or individually.

While it is possible to use Message.of builder methods to wrap the incoming message type, a custom type implementing Message interface helps to deal with different aspects we'll cover later, such as deserialization, message acknowledgment or tracing.

An example message implementation would be like the following:

package connectors;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import connectors.api.ConsumedMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class MyMessage<T> implements Message<T>, ContextAwareMessage<T> {

    private final T payload;
    private final Metadata metadata;
    private final MyAckHandler ackHandler;

    private final MyFailureHandler nackHandler;
    private ConsumedMessage<?> consumed;

    public MyMessage(ConsumedMessage<T> message, MyAckHandler ackHandler, MyFailureHandler nackHandler) {
        this.consumed = message;
        this.payload = message.body();
        this.ackHandler = ackHandler;
        this.nackHandler = nackHandler;
        this.metadata = ContextAwareMessage.captureContextMetadata(new MyIncomingMetadata<>(message));
    }

    public ConsumedMessage<?> getConsumedMessage() {
        return consumed;
    }

    @Override
    public T getPayload() {
        return payload;
    }

    @Override
    public Metadata getMetadata() {
        return metadata;
    }

    @Override
    public CompletionStage<Void> ack() {
        return ackHandler.handle(this);
    }

    @Override
    public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
        return nackHandler.handle(this, reason, nackMetadata);
    }
}

Note that MyMessage implements the ContextAwareMessage. In the constructor captureContextMetadata helper method is used to capture the Vert.x context which created the object and capturing it into the LocalContextMetadata. This metadata allows running each message in its own Vert.x context, supporting context propagation.

The MyMessage type implements the accessors for the metadata and payload from the Message interface. If the messaging technology doesn't have a built-in unmarshalling mechanism, the message can deserialize the raw payload to a primitive or a complex object.

Warning

The custom message implementation is usually not the type consumed by the application injecting channels. Applications usually inject in the payload, the raw consumed type (in the above example the ConsumedMessage), or some other type provided by the MessageConverters. Handling of Message types by the application is restricted only to advanced use cases, because handling of message acknowledgment is manual Even then the message may be intercepted before and changed, conserving the metadata, ack and nack handlers but not the original type created by the connector.

The MyIncomingMetadata gives access to the underlying consumed message attributes, and applications can inject this object for accessing message details:

package connectors;

import java.util.Map;

import connectors.api.ConsumedMessage;

public class MyIncomingMetadata<T> {

    private final ConsumedMessage<T> msg;

    public MyIncomingMetadata(ConsumedMessage<T> msg) {
        this.msg = msg;
    }

    public ConsumedMessage<T> getCustomMessage() {
        return msg;
    }

    public T getBody() {
        return msg.body();
    }

    public String getKey() {
        return msg.key();
    }

    public long getTimestamp() {
        return msg.timestamp();
    }

    public Map<String, String> getProperties() {
        return msg.properties();
    }
}

Also note that ack and nack method implementations are delegated to handler objects. This allows configuring different strategies at channel level.

Acknowledgment strategies

The acknowledgement is the way for message consumers to inform the broker that the message has been successfully received and processed. Depending on the messaging technology the broker then can decide to remove the message from the queue, flag as consumed or purge it completely. In Reactive Messaging there are different policies to trigger the acknowledgement but the canonical one is to acknowledge a message when the processing (potentially asynchronous) has completed (POST_PROCESSING).

The Reactive Messaging defines Message#ack method as non-blocking asynchronous, returning a CompletionStage<Void>, because potentially the acknowledgement action sends a network call to the broker.

The following example simply calls the client ack method using the Mutiny Uni and switch the emitter to the Message context. Returning back to the message context is essential for chaining asynchronous actions without losing the context and for keeping the consistency on message consumption flow.

package connectors;

import java.util.concurrent.CompletionStage;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;

public class MyAckHandler {

    private final BrokerClient client;

    static MyAckHandler create(BrokerClient client) {
        return new MyAckHandler(client);
    }

    public MyAckHandler(BrokerClient client) {
        this.client = client;
    }

    public CompletionStage<Void> handle(MyMessage<?> msg) {
        return Uni.createFrom().completionStage(client.ack(msg.getConsumedMessage()))
                .emitOn(msg::runOnMessageContext)
                .subscribeAsCompletionStage();
    }
}

While this ack handler strategy acknowledges each message to the broker, the messaging technology can allow employing different strategies for acknowledging messages. For example an ack strategy can track processed messages and acknowledge them altogether or call a different client side endpoint to acknowledge the message batch.

Failure handling strategies

The failure handling, or the negative acknowledgment allows indicating that a message was not processed correctly. Similar to the acknowledgment the Reactive Messaging defines Message#nack(Throwable reason, Metadata metadata) method as non-blocking asynchronous, returning a CompletionStage<Void>.

package connectors;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;

public class MyFailureHandler {

    private final BrokerClient client;

    static MyFailureHandler create(BrokerClient client) {
        return new MyFailureHandler(client);
    }

    public MyFailureHandler(BrokerClient client) {
        this.client = client;
    }

    public CompletionStage<Void> handle(MyMessage<?> msg, Throwable reason, Metadata metadata) {
        return Uni.createFrom().completionStage(() -> client.reject(msg.getConsumedMessage(), reason.getMessage()))
                .emitOn(msg::runOnMessageContext)
                .subscribeAsCompletionStage();
    }
}

Different failure handling strategies can, for example, - Ignore the failure, log and call the ack instead - Send the message to a dead letter queue and call the ack - Employ a different strategy depending on the Metadata associated with the nack method call.

Message Converters

The connector can propose default MessageConverter implementations for converting the payload to a custom type. As an example the following converter extracts the CustomMessage and puts it in the payload:

package connectors;

import java.lang.reflect.Type;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import connectors.api.ConsumedMessage;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;

@ApplicationScoped
public class MyMessageConverter implements MessageConverter {
    @Override
    public boolean canConvert(Message<?> in, Type target) {
        return TypeUtils.isAssignable(target, ConsumedMessage.class)
                && in.getMetadata(MyIncomingMetadata.class).isPresent();
    }

    @Override
    public Message<?> convert(Message<?> in, Type target) {
        return in.withPayload(in.getMetadata(MyIncomingMetadata.class)
                .map(MyIncomingMetadata::getCustomMessage)
                .orElse(null));
    }
}

Implementing Outbound Channels

The OutboundConnector implementation returns, for a given channel configuration, a Flow.Subscriber of messages. This is typically implemented by a custom Flow.Processor and using the MultiUtils.via helper methods to apply message transformations.

OutgoingConnectorFactory

The outbound channels can also implement the OutgoingConnectorFactory from the MicroProfile Reactive Messaging specification. However, it is usually more friendly to work with the MultiUtils.via methods to construct and transform outgoing messages.

Here is an example outgoing channel implementation:

package connectors;

import java.util.concurrent.Flow;

import org.eclipse.microprofile.reactive.messaging.Message;

import connectors.api.BrokerClient;
import connectors.api.SendMessage;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.mutiny.core.Vertx;

public class MyOutgoingChannel {
    private final String channel;
    private Flow.Subscriber<? extends Message<?>> subscriber;
    private final BrokerClient client;
    private final String topic;

    public MyOutgoingChannel(Vertx vertx, MyConnectorOutgoingConfiguration oc, BrokerClient client) {
        this.channel = oc.getChannel();
        this.client = client;
        this.topic = oc.getTopic().orElse(oc.getChannel());
        this.subscriber = MultiUtils.via(multi -> multi.call(m -> publishMessage(this.client, m)));
    }

    private Uni<Void> publishMessage(BrokerClient client, Message<?> m) {
        // construct the outgoing message
        SendMessage sendMessage = new SendMessage();
        Object payload = m.getPayload();
        sendMessage.setPayload(payload);
        sendMessage.setTopic(topic);
        m.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
            sendMessage.setTopic(out.getTopic());
            sendMessage.setKey(out.getKey());
            //...
        });
        return Uni.createFrom().completionStage(() -> client.send(sendMessage))
                .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(m.ack()))
                .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t)));
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return this.subscriber;
    }

    public String getChannel() {
        return this.channel;
    }
}
The MultiUtils.via helper method allows using the Multi chaining methods and in the same time provides a Flow.Subscriber. However, this implementation allows sending messages one at a time: one only after the previous outgoing message send is completed.

Some messaging technologies provide publish receipt, a message back from the broker to the sender that asynchronously acknowledges the sent operation. In this case the connector can only be sure of the send operation when it receives the publish receipt of that message. Some technologies may provide blocking sending calls, in that case the connector needs to delegate the sending call to a worker thread.

Depending on whether the client supports multiple in-flight outgoing messages, you can also use a SenderProcessor, which allows receiving configuration for the maximum number of in-flight messages and whether it waits for completion (publish receipt from the broker):

1
2
3
4
5
6
7
8
9
long requests = oc.getMaxPendingMessages();
boolean waitForWriteCompletion = oc.getWaitForWriteCompletion();
if (requests <= 0) {
    requests = Long.MAX_VALUE;
}
this.processor = new SenderProcessor(requests, waitForWriteCompletion, m -> publishMessage(client, m));
this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> {
    // log the failure
}));

Other more advanced scenarios can be implemented to retry the transmission in case of a retryable failure, or batch multiple outgoing messages to a single send operation.

Outgoing Message Builder

In order to convey all attributes of the outgoing message to the client library connectors provide outgoing Message implementation and a corresponding outgoing message metadata. These allow the application developer to build the message attributes that will be sent to the broker.

package connectors;

import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class MyOutgoingMessage<T> implements Message<T>, ContextAwareMessage<T> {

    private final T payload;
    private final Metadata metadata;

    private final Supplier<CompletionStage<Void>> ack;
    private final Function<Throwable, CompletionStage<Void>> nack;

    public static <T> MyOutgoingMessage<T> from(Message<T> message) {
        return new MyOutgoingMessage<>(message.getPayload(), message.getMetadata(), message.getAck(), message.getNack());
    }

    public MyOutgoingMessage(T payload, Metadata metadata,
            Supplier<CompletionStage<Void>> ack,
            Function<Throwable, CompletionStage<Void>> nack) {
        this.payload = payload;
        this.metadata = metadata;
        this.ack = ack;
        this.nack = nack;
    }

    public MyOutgoingMessage(T payload, String key, String topic,
            Supplier<CompletionStage<Void>> ack,
            Function<Throwable, CompletionStage<Void>> nack) {
        this(payload, Metadata.of(new MyOutgoingMetadata(topic, key)), ack, nack);
    }

    @Override
    public T getPayload() {
        return payload;
    }

    @Override
    public Metadata getMetadata() {
        return metadata;
    }

    @Override
    public Supplier<CompletionStage<Void>> getAck() {
        return this.ack;
    }

    @Override
    public Function<Throwable, CompletionStage<Void>> getNack() {
        return this.nack;
    }

    public MyOutgoingMessage<T> withKey(String key) {
        this.metadata.with(this.metadata.get(MyOutgoingMetadata.class)
                .map(m -> MyOutgoingMetadata.builder(m).withKey(key).build()));
        return this;
    }

    public MyOutgoingMessage<T> withTopic(String topic) {
        this.metadata.with(this.metadata.get(MyOutgoingMetadata.class)
                .map(m -> MyOutgoingMetadata.builder(m).withTopic(topic).build()));
        return this;
    }
}
package connectors;

public class MyOutgoingMetadata {
    private String topic;
    private String key;

    public static MyOutgoingMetadataBuilder builder() {
        return new MyOutgoingMetadataBuilder();
    }

    public static MyOutgoingMetadataBuilder builder(MyOutgoingMetadata metadata) {
        return new MyOutgoingMetadataBuilder(metadata);
    }

    public MyOutgoingMetadata(String topic, String key) {
        this.topic = topic;
        this.key = key;
    }

    public String getTopic() {
        return topic;
    }

    public String getKey() {
        return key;
    }

    public static class MyOutgoingMetadataBuilder {
        private String topic;
        private String key;

        public MyOutgoingMetadataBuilder() {

        }

        public MyOutgoingMetadataBuilder(MyOutgoingMetadata metadata) {
            this.key = metadata.getKey();
            this.topic = metadata.getTopic();
        }

        public MyOutgoingMetadataBuilder withTopic(String topic) {
            this.topic = topic;
            return this;
        }

        public MyOutgoingMetadataBuilder withKey(String key) {
            this.key = key;
            return this;
        }

        public MyOutgoingMetadata build() {
            return new MyOutgoingMetadata(topic, key);
        }
    }

}

The outgoing channel implementation then will construct the client library object that represents the outbound message, SendMessage in this example:

private Uni<Void> publishMessage(BrokerClient client, Message<?> message) {
    // construct the outgoing message
    SendMessage sendMessage;
    Object payload = message.getPayload();
    if (payload instanceof SendMessage) {
        sendMessage = (SendMessage) message.getPayload();
    } else {
        sendMessage = new SendMessage();
        sendMessage.setPayload(payload);
        sendMessage.setTopic(topic);
        message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
            sendMessage.setTopic(out.getTopic());
            sendMessage.setKey(out.getKey());
            //...
        });
    }
    return Uni.createFrom().completionStage(() -> client.send(sendMessage))
            .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack()))
            .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t)));
}

It is a best practice to also allow the application to return a payload of the client outbound library object (SendMessage).

Outgoing message acknowledgement

Because the Reactive Messaging chains acknowledgements from incoming message until the outgoing message, it is crucial for the outgoing channel to correctly ack and nack the message.

Smallrye Health Integration

Smallrye Reactive Messaging allows connectors to integrate with Smallrye Health to contribute channel state to the health reports. Connectors need to implement the HealthReporter interface and implement some or all of the getReadiness, getLiveness and getStartup methods:

@ApplicationScoped
@Connector(MyConnectorWithPartials.CONNECTOR_NAME)
public class MyConnectorWithPartials implements InboundConnector, OutboundConnector, HealthReporter {

    public static final String CONNECTOR_NAME = "smallrye-my-connector";

    List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
    List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

    @Override
    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (MyIncomingChannel channel : incomingChannels) {
            builder.add(channel.getChannel(), true);
        }
        for (MyOutgoingChannel channel : outgoingChannels) {
            builder.add(channel.getChannel(), true);
        }
        return builder.build();
    }

    @Override
    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (MyIncomingChannel channel : incomingChannels) {
            builder.add(channel.getChannel(), true);
        }
        for (MyOutgoingChannel channel : outgoingChannels) {
            builder.add(channel.getChannel(), true);
        }
        return builder.build();
    }

    @Override
    public HealthReport getStartup() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (MyIncomingChannel channel : incomingChannels) {
            builder.add(channel.getChannel(), true);
        }
        for (MyOutgoingChannel channel : outgoingChannels) {
            builder.add(channel.getChannel(), true);
        }
        return builder.build();
    }

Implementing health reports per channel depends on what information is available to the connector. For more information on different health check probes you can check out Configure Liveness, Readiness and Startup Probes

You may want to add a connector attribute to enable/disable the health reporting per channel:

1
2
3
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;

@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")

OpenTelemetry Tracing Integration

Smallrye Reactive Messaging allows connectors to easily integrate with OpenTelemetry tracing. It propagates the tracing context from inbound messages and to outbound messages. The smallrye-reactive-messaging-otel module provides necessary dependencies to the OpenTelemetry artifacts and also provides TracingUtils helper class for setting up the tracing.

1
2
3
4
<dependency>
    <groupId>io.smallrye.reactive.messaging</groupId>
    <artifactId>smallrye-reactive-messaging-otel</artifactId>
</dependency>

For integrating tracing you'd need to create a couple of classes:

  • Holder class for tracing information
  • Implementation of io.opentelemetry.context.propagation.TextMapGetter, which retrieves for a given key the value of a tracing attributed from the holder object
  • Implementation of io.opentelemetry.context.propagation.TextMapSetter, which sets on the holder object the key and value of tracing attributes
  • Implementations of io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor and io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter

Then you'd need to configure instrumenters per incoming and outgoing channel:

    public static Instrumenter<MyTrace, Void> createInstrumenter(boolean incoming) {
        MessageOperation messageOperation = incoming ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;

        MyAttributesExtractor myExtractor = new MyAttributesExtractor();
        MessagingAttributesGetter<MyTrace, Void> attributesGetter = myExtractor.getMessagingAttributesGetter();
        var spanNameExtractor = MessagingSpanNameExtractor.create(attributesGetter, messageOperation);
        InstrumenterBuilder<MyTrace, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
                "io.smallrye.reactive.messaging", spanNameExtractor);
        var attributesExtractor = MessagingAttributesExtractor.create(attributesGetter, messageOperation);

        builder
                .addAttributesExtractor(attributesExtractor)
                .addAttributesExtractor(myExtractor);

        if (incoming) {
            return builder.buildConsumerInstrumenter(MyTraceTextMapGetter.INSTANCE);
        } else {
            return builder.buildProducerInstrumenter(MyTraceTextMapSetter.INSTANCE);
        }
    }
    // </create-instrumener>

    public Message<?> traceIncoming(Message<?> message, MyTrace myTrace, boolean makeCurrent) {
        return TracingUtils.traceIncoming(instrumenter, message, myTrace, makeCurrent);
    }

    public void traceOutgoing(Message<?> message, MyTrace myTrace) {
        TracingUtils.traceOutgoing(instrumenter, message, myTrace);
    }
}

Finally, you'd need to configure instrumenters per incoming and outgoing channels and wire the call to instrumenter using TracingUtils.

For an incoming channel, you'd need to call the instrumenter on an inbound message:

Multi<? extends Message<?>> receiveMulti = Multi.createBy().repeating()
        .uni(() -> Uni.createFrom().completionStage(this.client.poll()))
        .until(__ -> closed.get())
        .emitOn(context::runOnContext)
        .map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler));

Instrumenter<MyTrace, Void> instrumenter = MyOpenTelemetryInstrumenter.createInstrumenter(true);
if (tracingEnabled) {
    receiveMulti = receiveMulti.map(message -> {
        ConsumedMessage<?> consumedMessage = message.getMetadata(MyIncomingMetadata.class).get().getCustomMessage();
        return TracingUtils.traceIncoming(instrumenter, message, new MyTrace.Builder()
                .withClientId(consumedMessage.clientId())
                .withTopic(consumedMessage.topic())
                .withProperties(consumedMessage.properties())
                .build());
    });
}

For an outgoing channel, you'd need to call the instrumenter on constructing the outbound message:

private Uni<Void> publishMessageWithTracing(BrokerClient client, Message<?> message) {
    // construct the outgoing message
    SendMessage sendMessage;
    Object payload = message.getPayload();
    if (payload instanceof SendMessage) {
        sendMessage = (SendMessage) message.getPayload();
    } else {
        sendMessage = new SendMessage();
        sendMessage.setPayload(payload);
        sendMessage.setTopic(topic);
        message.getMetadata(MyOutgoingMetadata.class).ifPresent(out -> {
            sendMessage.setTopic(out.getTopic());
            sendMessage.setKey(out.getKey());
            //...
        });
    }
    if (tracingEnabled) {
        Map<String, String> properties = new HashMap<>();
        TracingUtils.traceOutgoing(instrumenter, message, new MyTrace.Builder()
                .withProperties(properties)
                .withTopic(sendMessage.getTopic())
                .build());
        sendMessage.setProperties(properties);
    }
    return Uni.createFrom().completionStage(() -> client.send(sendMessage))
            .onItem().transformToUni(receipt -> Uni.createFrom().completionStage(message.ack()))
            .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack(t)));
}

You may want to add a connector attribute to enable/disable the tracing per channel:

1
2
3
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;

@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")

Testing the connector

While unit tests are highly encouraged for validating ad-hoc logic in connector code, by nature connector tests are mostly integration tests validating the correct configuration and functioning of channels. Most of the time tests need to run against a broker instance. This instance can be mocked or embedded in the test JVM, or provisioned in a container runtime using Testcontainers. The testcontainers approach is encouraged as it'll provide a testing environment closest to reality.

It may take too much time and resources to start a broker per test method or per test class, so may want to share the same broker instance between all test classes. In that case you can checkout how to write a JUnit 5 Extension and start only one container instance in the beginning of tests and stop it at the end of all the tests.

There are essentially two ways of creating the connector behavior to test against:

  1. Instantiating channels directly by passing the custom configuration. With this you can get the Reactive stream directly from the channel implementation and send/receive messages. You can use AssertSubscriber from Mutiny to regulate demand and write assertions.
  2. CDI-based tests which write configuration and instantiate application beans. You can use Weld, the reference implementation of CDI specification with configured set of beans and extensions:
    package connectors.test;
    
    import jakarta.enterprise.inject.spi.BeanManager;
    
    import org.eclipse.microprofile.config.ConfigProvider;
    import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
    import org.jboss.weld.environment.se.Weld;
    import org.jboss.weld.environment.se.WeldContainer;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    
    import connectors.MyConnector;
    import connectors.MyMessageConverter;
    import io.smallrye.config.SmallRyeConfigProviderResolver;
    import io.smallrye.config.inject.ConfigExtension;
    import io.smallrye.reactive.messaging.providers.MediatorFactory;
    import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
    import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
    import io.smallrye.reactive.messaging.providers.extension.ChannelProducer;
    import io.smallrye.reactive.messaging.providers.extension.EmitterFactoryImpl;
    import io.smallrye.reactive.messaging.providers.extension.HealthCenter;
    import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
    import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
    import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
    import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
    import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
    import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
    import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
    import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
    import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
    import io.smallrye.reactive.messaging.providers.wiring.Wiring;
    import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
    
    public class WeldTestBase {
    
        protected Weld weld;
        protected WeldContainer container;
    
        @BeforeEach
        public void initWeld() {
            weld = new Weld();
    
            // SmallRye config
            ConfigExtension extension = new ConfigExtension();
            weld.addExtension(extension);
    
            weld.addBeanClass(MediatorFactory.class);
            weld.addBeanClass(MediatorManager.class);
            weld.addBeanClass(InternalChannelRegistry.class);
            weld.addBeanClass(ConnectorFactories.class);
            weld.addBeanClass(ConfiguredChannelFactory.class);
            weld.addBeanClass(ChannelProducer.class);
            weld.addBeanClass(ExecutionHolder.class);
            weld.addBeanClass(WorkerPoolRegistry.class);
            weld.addBeanClass(HealthCenter.class);
            weld.addBeanClass(Wiring.class);
            weld.addExtension(new ReactiveMessagingExtension());
    
            weld.addBeanClass(EmitterFactoryImpl.class);
            weld.addBeanClass(MutinyEmitterFactoryImpl.class);
            weld.addBeanClass(LegacyEmitterFactoryImpl.class);
    
            weld.addBeanClass(MyConnector.class);
            weld.addBeanClass(MyMessageConverter.class);
            weld.addBeanClass(MetricDecorator.class);
            weld.addBeanClass(MicrometerDecorator.class);
            weld.disableDiscovery();
        }
    
        @AfterEach
        public void stopContainer() {
            if (container != null) {
                // TODO Explicitly close the connector
                getBeanManager().createInstance()
                        .select(MyConnector.class, ConnectorLiteral.of(MyConnector.CONNECTOR_NAME)).get();
                container.close();
            }
            // Release the config objects
            SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig());
        }
    
        public BeanManager getBeanManager() {
            if (container == null) {
                runApplication(new MapBasedConfig());
            }
            return container.getBeanManager();
        }
    
        public void addBeans(Class<?>... clazzes) {
            weld.addBeanClasses(clazzes);
        }
    
        public <T> T get(Class<T> clazz) {
            return getBeanManager().createInstance().select(clazz).get();
        }
    
        public <T> T runApplication(MapBasedConfig config, Class<T> clazz) {
            weld.addBeanClass(clazz);
            runApplication(config);
            return get(clazz);
        }
    
        public void runApplication(MapBasedConfig config) {
            if (config != null) {
                config.write();
            } else {
                MapBasedConfig.cleanup();
            }
    
            container = weld.initialize();
        }
    
        public static void addConfig(MapBasedConfig config) {
            if (config != null) {
                config.write();
            } else {
                MapBasedConfig.cleanup();
            }
        }
    
        public HealthCenter getHealth() {
            if (container == null) {
                throw new IllegalStateException("Application not started");
            }
            return container.getBeanManager().createInstance().select(HealthCenter.class).get();
        }
    
        public boolean isStarted() {
            return getHealth().getStartup().isOk();
        }
    
        public boolean isReady() {
            return getHealth().getReadiness().isOk();
        }
    
        public boolean isAlive() {
            return getHealth().getLiveness().isOk();
        }
    
    }
    

You would need following test dependencies for enabling Weld in tests:

    <dependency>
      <groupId>io.smallrye.reactive</groupId>
      <artifactId>test-common</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.jboss.weld.se</groupId>
      <artifactId>weld-se-shaded</artifactId>
      <version>${weld.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.jboss.weld</groupId>
      <artifactId>weld-core-impl</artifactId>
      <version>${weld.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.awaitility</groupId>
      <artifactId>awaitility</artifactId>
      <scope>test</scope>
    </dependency>

Your test classes can therefore extend the WeldTestBase and provide configuration and application beans:

package connectors.test;

import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import connectors.MyConnector;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class MyConnectorTest extends WeldTestBase {

    @Test
    void incomingChannel() {
        String host = "";
        int port = 0;
        String myTopic = UUID.randomUUID().toString();
        MapBasedConfig config = new MapBasedConfig()
                .with("mp.messaging.incoming.data.topic", myTopic)
                .with("mp.messaging.incoming.data.host", host)
                .with("mp.messaging.incoming.data.port", port)
                .with("mp.messaging.incoming.data.connector", MyConnector.CONNECTOR_NAME);
        MyApp app = runApplication(config, MyApp.class);

        int expected = 10;
        // produce expected number of messages to myTopic

        // wait until app received
        await().until(() -> app.received().size() == expected);
    }

    @ApplicationScoped
    public static class MyApp {

        List<String> received = new CopyOnWriteArrayList<>();

        @Incoming("data")
        void consume(String msg) {
            received.add(msg);
        }

        public List<String> received() {
            return received;
        }
    }
}

Awaitility

Because connector tests are usually asynchronous, awaitility provides a DSL to await on expressed assertions.

Common tests for validating the connector

  • Message consumption through incoming channels
  • Message producing through outgoing channels
  • Ack and failure handler strategies test
  • Message Context propagation test LocalPropagationTest
  • HealthCheckTest
  • MessageConverterTest
  • TracingPropagationTest
  • Configuration test
  • Authentication test
  • Tests for