SmallRye Reactive Messaging is an implementation of the (next to be) Eclipse MicroProfile Reactive Messaging specification. It provides a way to implement reactive data streaming application using a CDI development model.

It provides:

  • a development model to build data streaming applications

  • connections for Apache Kafka, AMQP 1.0, MQTT, Apache Camel…​

  • a way to inject manipulated streams into regular CDI beans and JAX-RS resources

SmallRye Reactive Messaging enables data streaming application, event-driven and asynchronous microservices, event sourcing applications…​

overview

1. Getting Started

1.1. Quickstart

The easiest to start using SmallRye Reactive Messaging is to start it directly in a main class.

Creates a Maven project, and include the following dependency in your pom.xml:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-provider-1.0</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-streams-operators-1.0</artifactId>
  <version>1.0.6</version>
</dependency>
<dependency>
  <groupId>org.jboss.weld.se</groupId>
  <artifactId>weld-se-core</artifactId>
  <version>3.1.1.Final</version>
</dependency>

Once created, create a class file with a public static void main(String…​ args) method:

package io.smallrye.reactive.messaging.quickstart;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.se.SeContainerInitializer;

@ApplicationScoped
public class Main {

  public static void main(String[] args) {
    SeContainerInitializer.newInstance().initialize();
  }


}

Then, we need CDI beans. For instance:

package io.smallrye.reactive.messaging.quickstart;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBean {

  @Outgoing("source")
  public PublisherBuilder<String> source() {
    return ReactiveStreams.of("hello", "with", "SmallRye", "reactive", "message");
  }

  @Incoming("source")
  @Outgoing("processed-a")
  public String toUpperCase(String payload) {
    return payload.toUpperCase();
  }

  @Incoming("processed-a")
  @Outgoing("processed-b")
  public PublisherBuilder<String> filter(PublisherBuilder<String> input) {
    return input.filter(item -> item.length() > 4);
  }

  @Incoming("processed-b")
  public void sink(String word) {
    System.out.println(">> " + word);
  }

}

Finally, you need an empty beans.xml. Copy the following content to src/main/resources/META-INF/beans.xml:

<beans
  xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="
      http://xmlns.jcp.org/xml/ns/javaee
      http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd">

</beans>

Once everything is setup, you should be able to run the application using:

mvn compile exc:java -Dexec.mainClass=<Main fully qualified class name>

Running the previous example should give the following output:

>> HELLO
>> SMALLRYE
>> REACTIVE
>> MESSAGE

Smallrye Reactive Messaging uses SLF4J as logger. Check the SLF4J documentation to configure the logs. For testing purpose, you can simply add the following dependency to your pom.xml file:

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-simple</artifactId>
  <version>1.7.26</version>
</dependency>

2. Concepts

MicroProfile Reactive Messaging proposes a model to build data streaming application using a CDI development model. It is based on 3 main concepts:

  • Message

  • @Incoming

  • @Outgoing

Behind the scene, SmallRye Reactive Streams composes Reactive Streams and ensures the consistent flow of messages.

2.1. Messages

A Message (defined by org.eclipse.microprofile.reactive.messaging.Message) is an envelope around a payload. The Message interface just offers a way to retrieve the payload (getPayload) and to acknowledge the message (ack).

message

The Message interface is parameterized with the type of the payload. For example, Message<String> describes a message containing a String payload.

The interface can be subclassed by the user or by transport layers to convey more attributes. For example, a Message coming from Kafka contain headers, as well as others metadata.

Acknowledgement is covered below, in a dedicated section.

2.2. Incoming

Incoming is an annotation indicating that the method consumes a stream. The name of the stream is given as attribute such as in:

@Incoming("my-stream")
public void consume(Message<String> s) {
  // ...
}

A method only annotated with @Incoming forms the end of a processing chain, often called sink:

incoming

2.3. Outgoing

Outgoing is an annotation indicating that the method feeds a stream. The name of the stream is given as attribute:

@Outgoing("my-stream")
public Message<String> produce() {
  // ...
}

A method only annotated with @Outgoing is a sort of data source:

outgoing

2.4. Methods with @Outgoing and @Incoming

Of course, methods can also use both annotations to transform the incoming messages:

@Incoming("from")
@Outgoing("to")
public String toUpperCase(String input) {
  return input.toUpperCase();
}

A method annotated with both annotation is generally called processor or mediator:

processor

2.5. Binding matching @Outgoing to @Incoming

SmallRye Reactive Messaging automatically binds matching @Outgoing to @Incoming to form a chain:

chain

Methods annotated with @Incoming or @Outgoing don’t have to be in the same bean (class). You can distribute them amount a set of beans. Remote interactions are possible using a transport layer. This topic is covered in another section.

3. Generating data

To generate data (act as a data source), you need a method annotated with @Outgoing.

3.1. Supported Signatures

  • @Outgoing Publisher<Message<O>> method()

  • @Outgoing Publisher<O> method()

  • @Outgoing PublisherBuilder<Message<O>> method()

  • @Outgoing PublisherBuilder<O> method()

  • @Outgoing Message<O> method()

  • @Outgoing O method()

  • @Outgoing CompletionStage<Message<O>> method()

  • @Outgoing CompletionStage<O> method()

The first four variants generates a stream. The other methods generate each item individually.

3.2. Examples

The following code snippet provides example of the different variants of signature.

package io.smallrye.reactive.messaging.snippets;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class GenerateExamples {

  /**
   * Produces a simple stream. The produced payloads are automatically wrapped
   * into Messages.
   */
  @Outgoing("data-1")
  public PublisherBuilder<String> generate() {
    return ReactiveStreams.of("a", "b", "c");
  }

  /**
   * Produces a simple stream of Message.
   */
  @Outgoing("data-2")
  public PublisherBuilder<Message<String>> generateMessages() {
    return ReactiveStreams.of("a", "b", "c").map(Message::of);
  }

  private AtomicInteger counter = new AtomicInteger();

  /**
   * Generates a stream payload by payload.
   */
  @Outgoing("data-3")
  public int count() {
    return counter.incrementAndGet();
  }

  /**
   * Generates a stream message by message.
   */
  @Outgoing("data-4")
  public Message<Integer> countAsMessage() {
    return Message.of(counter.incrementAndGet());
  }

  private Executor executor = Executors.newSingleThreadExecutor();

  /**
   * Generates a stream message by message. The next meesage is only generated
   * when the value from the previous one is redeemed.
   */
  @Outgoing("data-5")
  public CompletionStage<Message<Integer>> produceAsyncMessages() {
    return CompletableFuture.supplyAsync(() -> Message.of(counter.incrementAndGet()), executor);
  }

}

4. Consuming data

To consume data (act as a data sink), you just need a method only annotated with @Incoming.

4.1. Supported Signatures

  • @Incoming Subscriber<Message<I>> method()

  • @Incoming Subscriber<I> method()

  • @Incoming void method(I msg)

  • @Incoming CompletionStage<?> method(Message<I> msg)

  • @Incoming CompletionStage<?> method(I msg)

The two first methods return a Reactive Streams Subscriber. The other methods consume the incoming items one by one. Notice that, when returning a CompletionStage, the method won’t be called until the previously returned CompletionStage is completed.

4.2. Examples

The following code snippet provides example of the different variants of signature.

package io.smallrye.reactive.messaging.snippets;


import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Subscriber;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class ConsumptionExamples {

  @Incoming("data")
  public Subscriber<String> consume() {
    return ReactiveStreams.<String>builder()
      .map(String::toUpperCase)
      .findFirst()
      .build();
  }

  @Incoming("data")
  public Subscriber<Message<String>> consumeMessages() {
    return ReactiveStreams.<Message<String>>builder()
      .findFirst()
      .build();
  }

  @Incoming("data")
  public void consumeOne(String input) {
    //...
  }

  @Incoming("data")
  public CompletionStage<Void> consumeAsync(String input) {
    return CompletableFuture.runAsync(() -> {
      // ...
    });
  }

  @Incoming("data")
  public CompletionStage<Void> consumeMessageAsync(Message<String> input) {
    return CompletableFuture.runAsync(() -> {
      // ...
    });
  }

}

5. Processing data

There are several ways to manipulate items transiting in a streams.

5.1. Manipulating items one by one

The first possibility is to implement a method with both @Incoming and @Outgoing and receiving the Message or payload one by one. For each input, an output is produced. This output can be generated asynchronously by returning a CompletionStage.

The following signatures are supported:

  • @Outgoing @Incoming Message<O> method(Message<I> msg)

  • @Outgoing @Incoming O method(I payload)

  • @Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)

  • @Outgoing @Incoming CompletionStage<O> method(I payload)

5.2. Consuming one items and producing a streams

Another possibility is to consume the items one by one, but instead of producing a single result, produce a stream, potentially empty. The following method signatures are supported:

  • @Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)

  • @Outgoing @Incoming Publisher<O> method(I payload)

  • @Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)

  • @Outgoing @Incoming PublisherBuilder<O> method(I payload)

For example, the following method duplicates the received messages:

  @Incoming("proc-a")
  @Outgoing("proc-b")
  public PublisherBuilder<String> duplicate(String input) {
    return ReactiveStreams.of(input, input);
  }

As said above, you can potentially return an empty stream to "ignore" the outcome.

  @Incoming("proc-b")
  @Outgoing("proc-c")
  public PublisherBuilder<String> removeC(String input) {
    return ReactiveStreams.of(input)
      .filter(s -> ! s.equalsIgnoreCase("c"))
      .map(String::toUpperCase);
  }

The returned streams are concatenated and the emissions are not mixed.

5.3. Creating a Reactive Stream Processor

A Processor is a Reactive Stream entity acting as a subscriber (so consume data) and publisher (so publish data). To describe your data transformation logic, you can create and return a Processor (or a ProcessorBuilder):

  @Incoming("proc-c")
  @Outgoing("proc-d")
  public ProcessorBuilder<String, String> process() {
    return ReactiveStreams.<String>builder()
      .map(s -> "-" + s + "-");
  }

The following methods are supported:

  • @Outgoing @Incoming Processor<Message<I>, Message<O>> method()

  • @Outgoing @Incoming Processor<I, O> method()

  • @Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()

  • @Outgoing @Incoming ProcessorBuilder<I, O> method()

The returned streams are concatenated and the emissions are not mixed.

5.4. Manipulating streams

You can also transform streams using the following methods:

  • @Outgoing @Incoming Publisher<Message<O>> method(Publisher<Message<I>> pub)

  • @Outgoing @Incoming PublisherBuilder<Message<O>> method(PublisherBuilder<Message<I>> pub)

  • @Outgoing @Incoming Publisher<O> method(Publisher<I> pub)

  • @Outgoing @Incoming PublisherBuilder<O> method(PublisherBuilder<I> pub)

5.5. Reactive Streams, Reactive Stream Operators, RX Java

SmallRye Reactive Messaging supports:

  • Publisher, PublisherBuilder (MicroProfile Reactive Streams Operators), Flowable (RX Java 2)

  • Processor, ProcessorBuilder (MicroProfile Reactive Streams Operators)

6. Acknowledgement

Messages may require acknowledgment to inform the transport layer that the message has been processed. Methods annotated with @Incoming and also be annotated with org.eclipse.microprofile.reactive.messaging.Acknowledgment to configure the acknowledgement policies.

Four policies are supported:

  • NONE: no acknowledgement

  • MANUAL: the developer is responsible for the acknowledgement. It can be perform using Message.ack().

  • PRE_PROCESSING: the acknowledgement is managed automatically before the processing

  • POST_PROCESSING: the acknowledgement is managed automatically after the processing

The following example describes how PRE_PROCESSING can be configured:

  @Incoming("i")
  @Outgoing("j")
  @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
  public String autoAck(String input) {
    return input.toUpperCase();
  }

Manual acknowledgement can be achieved as follows:

  @Incoming("i")
  @Outgoing("j")
  @Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public CompletionStage<Message<String>> manualAck(Message<String> input) {
    return CompletableFuture.supplyAsync(input::getPayload)
      .thenApply(Message::of)
      .thenCompose(m -> input.ack().thenApply(x -> m));
  }

Default acknowledgement depends on the method signature:

Signature Default policy Supported policy

@Incoming Subscriber<Message<I>> method()

Post-Processing

None, Pre, Post, Manual

@Incoming Subscriber<I> method()

Post-Processing

None, Pre, Post

@Incoming void method(I msg)

Post-Processing

None, Pre, Post

@Incoming CompletionStage<?> method(Message<I> msg)

Post-Processing

None, Pre, Post, Manual

@Incoming CompletionStage<?> method(I msg)

Post-Processing

None, Pre, Post

@Outgoing @Incoming Processor<Message<I>, Message<O>> method()

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming Processor<I, O> method()

Pre-Processing

None, Pre

@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming ProcessorBuilder<I, O> method()

Pre-Processing

None, Pre

@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming Publisher<O> method(I payload)

Pre-Processing

None, Pre

@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming PublisherBuilder<O> method(I payload)

Pre-Processing

None, Pre

@Outgoing @Incoming Message<O> method(Message<I> msg)

Post-Processing

None, Manual, Pre, Post

@Outgoing @Incoming O method(I payload)

Post-Processing

None, Pre, Post

@Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)

Post-Processing

None, Manual, Pre, Post

@Outgoing @Incoming CompletionStage<O> method(I payload)

Post-Processing

None, Pre, Post

@Outgoing @Incoming Publisher<Message<O>> method(Publisher<Message<I>> pub)

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming PublisherBuilder<Message<O>> method(PublisherBuilder<Message<I>> pub)

Pre-Processing

None, Manual, Pre

@Outgoing @Incoming Publisher<O> method(Publisher<I> pub)

Pre-Processing

None, Pre

@Outgoing @Incoming PublisherBuilder<O> method(PublisherBuilder<I> pub)

Pre-Processing

None, Pre

7. Connecting transports

SmallRye Reactive Messaging supports various transport protocols. This section explains how you can configure these different connectors. Before listing the supported transports, we need to explain the configuration format. SmallRye Reactive Messaging uses MicroProfile Config as main configuration source. To enable the configuration, add an implementation of MicroProfile Config in your classpath. For instance, add the following dependency:

<dependency>
  <groupId>io.smallrye</groupId>
  <artifactId>smallrye-config-1.3</artifactId>
  <version>1.0.0</version>
</dependency>

Then create a properties file in src/main/resources/META-INF/microprofile-config.properties such as:

# Configure two channels using a dummy connector
mp.messaging.incoming.dummy-incoming-channel.connector=dummy
mp.messaging.incoming.dummy-incoming-channel.attribute=value
mp.messaging.outgoing.dummy-outgoing-channel.connector=dummy
mp.messaging.outgoing.dummy-outgoing-channel.attribute=value

Each channel (both incoming and outgoing) are configured individually in this file:

  • an incoming channel consumes data from a message broker or something producing data. It can be connected to a method annotated with an @Incoming using the same channel name.

  • an outgoing consumes data from the application and forward it to a message broker or something consuming data. It can be connected to a method annotated with an @Outgoing using the same channel name.

The configuration format is the follow:

mp.messaging.[outgoing|incoming].[channel-name].[attribute]=[value]
  1. the [outgoing|incoming] segment indicates the direction of the channel

  2. the [channel-name] segment configures the name of the channel

  3. the [attribute] segment is the name of the attribute to configure. Most of the time the attributes are specific to the transport layer

  4. the [value] is the value

All channels must declare the connector attribute matching the name of the connector.

Here is an example of a channel using a MQTT connector, consuming data from a MQTT broker, and a channel using a Kafka connector (forwarding data to Kafka):

# [Channel - health] - Consume data from MQTT
mp.messaging.incoming.health.topic=neo
mp.messaging.incoming.health.connector=smallrye-mqtt
mp.messaging.incoming.health.host=localhost
mp.messaging.incoming.health.broadcast=true
# [/Channel - health]

# [Channel - data] - Produce data to Kafka
mp.messaging.outgoing.data.connector=smallrye-kafka
mp.messaging.outgoing.data.bootstrap.servers=localhost:9092
mp.messaging.outgoing.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.data.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer
mp.messaging.outgoing.data.acks=1
# [/Channel - data]

8. Interacting with Apache Kafka

You can connect to Apache Kafka as a source or sink. The Kafka support is based on the Vert.x Kafka Client.

8.1. Dependency

To enable the Kafka support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-kafka-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

8.2. Retrieving messages from Kafka

To retrieve messages from a Kafka topic, you need the following configuration:

# [Source - heartbeat] - Consume data from Kafka
mp.messaging.incoming.kafka-heartbeat.connector=smallrye-kafka
mp.messaging.incoming.kafka-heartbeat.topic=heartbeat
mp.messaging.incoming.kafka-heartbeat.bootstrap.servers=localhost:9092
mp.messaging.incoming.kafka-heartbeat.group.id=demo
mp.messaging.incoming.kafka-heartbeat.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
# [/Source - heartbeat]

The connector attribute is required to indicate you are configuring the Kafka transport. Supported attributes are listed in the following table:

Attribute

Mandatory

Default

Description

topic

true

the channel-name

The topic to consume, use the channel-name if not set

bootstrap.servers

false

localhost:9092

the comma separated list of servers (host:port), Can also be set using the kafka.bootstrap.servers config property.

key.deserializer

false

the codec used to deserialize the key, String by default

value.deserializer

true

the codec used to deserialize the value

group.id

false

the group id, random if omitted

enable.auto.commit

false

false

whether or not the messages are committed automatically

retry

false

true

Whether it should retry to re-established the connection to the broker is it fails

retry-attempts

false

5

Number of retries

broadcast

false

false

Whether the received messages can be dispatched to several @Incoming

You can also configure any attributes used by the Vert.x Kafka client.

When consuming a message from Kafka, you can get the Message as a io.smallrye.reactive.messaging.kafka.KafkaMessage. It lets you access other metadata such as the headers, the message key (the payload being the value):

  @Incoming("from-kafka")
  public void consume(KafkaMessage<String, JsonObject> message) {
    JsonObject payload = message.getPayload();
    String key = message.getKey();
    MessageHeaders headers = message.getHeaders();
    Integer partition = message.getPartition();
    Long timestamp = message.getTimestamp();
  }

8.3. Forwarding messages to Kafka

To send messages to a Kafka topic, you need the following configuration:

# [Sink - data] - Produce data to Kafka
mp.messaging.outgoing.data.connector=smallrye-kafka
mp.messaging.outgoing.data.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer
# [/Sink - data]

The connector attribute is required to indicate you are configuring the Kafka transport. Supported attributes are listed in the following table:

Attribute

Mandatory

Default

Description

topic

false

the channel-name

The topic to consume, if not set it must be set in the message

bootstrap.servers

false

localhost:9092

the comma separated list of servers (host:port), Can also be set using the kafka.bootstrap.servers config property.

key.serializer

false

String

the codec used to serialize the key, String if not set

value.serializer

true

the codec used to serialize the value

key

false

the key to be used, can be overridden by the message

partition

false

none

the partition

waitForWriteCompletion

false

false

whether it needs to wait for the write operation to complete before processing the next message

You can also configure any attributes used by the Vert.x Kafka client.

When sending a message to Kafka, you can send a Kafka message to configure various aspects such as the topic and key:

  @Outgoing("to-kafka")
  public KafkaMessage<String, String> produce(Message<String> incoming) {
    return KafkaMessage.of("topic", "key", incoming.getPayload());
  }

9. Interacting using AMQP

You can connect to an AMQP broker or server as a source or sink. The AMQP support is based on the Vert.x AMQP Client.

9.1. Dependency

To enable the AMQP support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-amqp-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

9.2. Retrieving messages from AMQP

mp.messaging.incoming.data.address=data
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=username
mp.messaging.incoming.data.password=secret
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.containerId=my-container-id

Message coming from AMQP are io.smallrye.reactive.messaging.amqp.AmqpMessage. The payload must a a supported AMQP type. The AmqpMessage implements Message and provide additional metadata related to AMQP.

As the AMQP support is based on the Vert.x AMQP client, you can pass any configuration supported by the client. Check the documentation of the client for further details.

If the address attribute is not set the channel name is used instead.

9.3. Forwarding messages to AMQP

mp.messaging.outgoing.data.address=data
mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=username
mp.messaging.outgoing.data.password=secret
mp.messaging.outgoing.data.containerId=my-container-id
mp.messaging.outgoing.data.durable=true
mp.messaging.outgoing.data.durable=10000

The AMQP connector dispatches messages to the AMQP broker or server. You can send bare message or io.smallrye.reactive.messaging.amqp.AmqpMessage.

The payload of the message must be one of the following type:

  • String

  • Boolean, Byte, Character, Double, Float, Integer, Long, Short

  • Buffer

  • Instant

  • (Vert.x) JSON Array or JSON Object (send as binary with content type set to application/json)

  • UUID

Otherwise, the connector invokes toString on the wrapped payload.

If the address attribute is not set the channel name is used instead.

The host, port, username and password can also be configured using the amqp-host, amqp-port, amqp-username and amqp-password configuration properties.

10. Integrating with Apache Camel

You can receive messages from a Camel Route or send messages to a Camel route. It give you the ability to connect to almost any existing system.

10.1. Dependency

To enable the Camel support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-camel-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

10.2. Camel Reactive

When using the dependency, your bean can retrieve a org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService object:

// Default Camel Context
@Inject
private CamelContext camel;

// Camel Reactive service
@Inject
private CamelReactiveStreamsService camel_reactive;

10.3. Using Camel Route in @Outgoing method

Here is an example of method annotated with @Outgoing directly using a Camel route:

@Outgoing("camel")
public Publisher<Exchange> source() {
  return camel_reactive.from("seda:camel");
}

10.4. Using Camel Route in @Incoming method

Here is an example of method annotated with @Incoming directly using a Camel route:

@Incoming("camel")
public Subscriber<String> sink() {
  return camel.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
}

Another possibility is to use a producer:

@Incoming("camel")
public CompletionStage<Void> sink(String value) {
  return camel.createProducerTemplate().asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
}

10.5. Retrieving messages from a Camel Route

The previous example requires your code to deal with Maven route directly. It can be externalized to the configuration as follows:

mp.messaging.incoming.data.connector=smallrye-camel
mp.messaging.incoming.data.endpoint-uri:seda:my-route

The endpoint-uri configures the route to use. It can be:

  1. a reactive-stream route such as reactive-streams:out

  2. a regular route such as seda:my-route

10.6. Forwarding messages to a Camel Route

The following configuration describe a sink using a Camel route:

mp.messaging.outgoing.data.connector=smallrye-camel
mp.messaging.outgoing.data.endpoint-uri:seda:my-route

The endpoint-uri configures the route to use. It can be:

  1. a reactive-stream route such as reactive-streams:in

  2. a regular route such as file:./target?fileName=values.txt&fileExist=append

In the first case, it can be used as follows:

@ApplicationScoped
public class BeanWithCamelSinkUsingRSRoute extends RouteBuilder {

  @Outgoing("data")
  public Publisher<Message<String>> source() {
    return ReactiveStreams.of("a", "b", "c", "d")
      .map(String::toUpperCase)
      .map(Message::of)
      .buildRs();
  }

  // Declare a route
  @Override
  public void configure() {
    from("reactive-streams:in").to("file:./target?fileName=values.txt&fileExist=append");
  }
}

11. Interacting using MQTT

You can connect to a MQTT broker or server as a source or sink. The MQTT support is based on the Vert.x MQTT Client.

11.1. Dependency

To enable the MQTT support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-mqtt-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

11.2. Retrieving messages from MQTT

mp.messaging.incoming.data.topic=data
mp.messaging.incoming.data.connector=smallrye-mqtt
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=8883
mp.messaging.incoming.data.username=username
mp.messaging.incoming.data.password=secret
mp.messaging.incoming.data.broadcast=true

Message coming from MQTT are Message<byte[]>. The payload is a byte[]. You can also receive io.smallrye.reactive.messaging.mqtt.MqttMessage which give you access to message metadata.

If the topic attribute is not set the channel name is used instead.

11.3. Forwarding messages to MQTT

mp.messaging.outgoing.data.topic=data
mp.messaging.outgoing.data.connector=smallrye-mqtt
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=8883
mp.messaging.outgoing.data.username=username
mp.messaging.outgoing.data.password=secret

The payload of message forwarded to MQTT must be:

  • a io.vertx.core.json.JsonObject or a io.vertx.core.json.JsonArray

  • a String or a primitive types

  • a byte[] or a Vert.x Buffer

Other objects are mapped to JSON.

If the topic attribute is not set the channel name is used instead.

11.4. Advanced configuration

Check io.smallrye.reactive.messaging.mqtt.MqttSource.MqttSource and io.smallrye.reactive.messaging.mqtt.MqttSource.MqttSink for further details on the attributes.

12. Emitting HTTP requests

You can also push the incoming messages to a HTTP endpoint using the HTTP connector.

12.1. Dependency

To enable the HTTP support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-http-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

12.2. Forwarding messages to HTTP

mp.messaging.outgoing.data.url=http://localhost:8089
mp.messaging.outgoing.data.connector=smallrye-http

Your method can also returns a HttpMessage to configure the headers, query parameters, the HTTP method (POST (default) or PUT):

  @Incoming("source")
  @Outgoing("to-http")
  public HttpMessage<JsonObject> consume(Message<String> incoming) {
    return HttpMessage.HttpMessageBuilder.<JsonObject>create()
      .withMethod("PUT")
      .withPayload(new JsonObject().put("value", incoming.getPayload().toUpperCase()))
      .withHeader("Content-Type", "application/json")
      .build();
  }

Supported payloads are: * String * Vert.x Buffers, byte array, ByteBuffer * Vert.x JsonObject and JsonArray

13. Interacting with the Vert.x Eventbus

The Vert.x Eventbus is the backbone of Vert.x applications. It allows different application components to interact in an asynchronous way.

13.1. Dependency

To enable the Vert.x EventBus support, you need the following dependency:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-vertx-eventbus-1.0</artifactId>
  <version>1.0.0</version>
</dependency>

13.2. Retrieving messages from the event bus

mp.messaging.incoming.data.address=<the-eventbus-address>
mp.messaging.incoming.data.connector=smallrye-vertx-eventbus
mp.messaging.incoming.data.use-reply-as-ack=<true|false>
mp.messaging.incoming.data.broadcast=<true|false>

The address indicates from which event bus address the messages are retrieved. use-reply-as-ack instructs the framework to reply to the event bus message as acknowledgement mechanism. Finally, the broadcast entry broadcasts the to many subscribers.

The received messages are instance of io.smallrye.reactive.messaging.eventbus.EventBusMessage. The payload is the message body. From the EventBusMessage, you can retrieve the headers, address…​

13.3. Sending messages to the event bus

mp.messaging.incoming.data.address=<the-eventbus-address>
mp.messaging.incoming.data.connector=smallrye-vertx-eventbus
mp.messaging.incoming.data.publish=<true|false>
mp.messaging.incoming.data.expect-reply=<true|false>
mp.messaging.incoming.data.codec=<codec-name>
mp.messaging.incoming.data.timeout=1000

The address indicates where the messages are sent. publish choose whether the message is sent to a single consumer ( false) or broadcast (true). expect-reply indicates if you expect a reply. No other messages will be sent until the reply is received. codec is the name of the codec (it must have been registered explicitly). Finally, the timeout specifies the reply timeout.

The event bus message is created from the Reactive Messaging message. The payload is passed as body.

14. Advanced and Experimental features

14.1. @Merge

The @Merge annotation can be used on a method annotated with @Incoming to subscribed to all matching `@Outgoing streams. For example, if you have several method producing data to a stream named "sink", the following method receives all of them:

@Incoming("sink")
@Merge
@Acknowledgment(Acknowledgment.Mode.NONE)
public CompletionStage<Void> justASink(Message<String> ignored) {
  return CompletableFuture.completedFuture(null);
}

The @Merge annotation can be configured with a merge policy:

  • ONE - Pick the first source and use only this one.

  • MERGE - Merge the different sources. This strategy emits the items as they come. (default)

  • CONCAT - Concat the sources.

14.2. @Broadcast

The @Broadcast annotation can be used on a method annotation with @Outgoing to broadcast the emitted messages to all the matching subscribers:

@Outgoing("Y")
@Incoming("X")
@Broadcast
public String process(String s) {
  return s.toUpperCase();
}

14.3. @Stream

@Stream is a qualifier to inject a stream into a regular CDI bean. It’s used to bridge the imperative and reactive worlds, or retrieve streams managed by Reactive Messaging.

You can retrieve a stream using:

@Inject
@Stream("hello")
private Publisher<Message<String>> field;

The value indicates the name of the stream. You can inject a stream as:

  • Publisher<Message<X>> or Publisher<X>

  • PublisherBuilder<Message<X>> or PublisherBuilder<X>

  • Flowable<Message<X>> or Flowable<X>

You must have a @Incoming("hello") somewhere in your application (meaning a method consuming messages transiting on the stream hello), or a sink of message configured (mp.messaging.outgoing.hello…​)

You can also emits data to a stream using:

@Inject @Stream("hello")
Emitter<String> emitter;

// ...
emitter.send("a").send("b");

You can inject an io.smallrye.reactive.messaging.annotations.Emitter<T> or io.smallrye.reactive.messaging.annotations.Emitter<Message<T>>. To use an Emitter for the stream hello, you need a @Incoming("hello") somewhere in your code (or in your configuration).

14.4. Logging

The logging uses SLF4J, check the SLF4J web site for further details.

14.5. Strict Binding Mode

By default, SmallRye Reactive Messaging does not enforce whether all mediators are connected. It just print a warning message. The strict mode fails the deployment if some "incoming" are not connected to "outgoing". To enable this mode, pass the -Dsmallrye-messaging-strict-binding=true to the command line.