Development Model and Annotations

Reactive Messaging proposes a CDI-based programming model to implement event-driven applications. Following the CDI principles, beans are forming the main building block of your application. Reactive Messaging provides a set of annotations and types to implement beans that generate, consume or process messages.

A brief overview of the annotations

Reactive Messaging provides two main annotations:

These annotations are used on methods:

package beans;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MessageProcessingBean {

    @Incoming("consumed-channel")
    @Outgoing("populated-channel")
    public Message<String> process(Message<String> in) {
        // Process the payload
        String payload = in.getPayload().toUpperCase();
        // Create a new message from `in` and just update the payload
        return in.withPayload(payload);
    }
}
Reactive Messaging beans can either be in the application scope (@ApplicationScoped) or dependent scope (@Dependent).

Manipulating messages can be cumbersome. When you are only interested by the payload, you can use the following syntax: The following code is equivalent to the snippet from above:

package beans;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PayloadProcessingBean {

    @Incoming("consumed-channel")
    @Outgoing("populated-channel")
    public String process(String in) {
        return in.toUpperCase();
    }
}
You should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

SmallRye Reactive Messaging automatically binds matching @Outgoing to @Incoming to form a chain:

chain

If we consider the following code:

@Outgoing("source")
public Multi<String> generate() {
    return Multi.createFrom().items("Hello", "from", "reactive", "messaging");
}

@Incoming("source")
@Outgoing("sink")
public String process(String in) {
    return in.toUpperCase();
}

@Incoming("sink")
public void consume(String processed) {
    System.out.println(processed);
}

It would generate the following chain:

generate --> [ source ] --> process --> [ sink ] --> consume

Methods annotated with @Incoming or @Outgoing don’t have to be in the same bean (class). You can distribute them among a set of beans. Remote interactions are also possible when using connectors.

Methods annotated with:

  • only @Outgoing are used to generate messages or payloads

  • only @Incoming are used to consume messages or payloads

  • both @Incoming and @Outgoing are used to process messages or payloads; or transform the stream == Creating Messages

Messages are envelopes around payload. They are the vehicle. While manipulating payload is convenient, messages let you add metadata, handle acknowledgement…​

Creating Messages is done using the Message interface directly:

// Create a simple message wrapping a payload
Message<Price> m1 = Message.of(price);

// Create a message with metadata
Message<Price> m2 = Message.of(price, Metadata.of(new PriceMetadata()));

// Create a message with several metadata
Message<Price> m3 = Message.of(price,
    Metadata.of(new PriceMetadata(), new MyMetadata()));

// Create a message with an acknowledgement callback
Message<Price> m4 = Message.of(price, () -> {
    // Called when the message is acknowledged by the next consumer.
    return CompletableFuture.completedFuture(null);
});

// Create a message with both metadata and acknowledgement callback
Message<Price> m5 = Message.of(price,
    Metadata.of(new PriceMetadata()),
    () -> {
        // Called when the message is acknowledged by the next consumer.
        return CompletableFuture.completedFuture(null);
    });

You can also create new instance of Message from an existing one:

// Create a new message with a new payload but with the same metadata
Message<Price> m1 = message.withPayload(new Price(12.4));

// Create a new message with a new payload and add another metadata
Message<Price> m2 = message
    .withPayload(new Price(15.0))
    .withMetadata(Metadata.of(new PriceMetadata()));

// Create a new message with a new payload and a custom acknowledgement
Message<Price> m3 = message
    .withPayload(new Price(15.0))
    .withAck(() ->
        // acknowledge the incoming message
        message.ack()
            .thenAccept(x -> {
                // do something
            }));
Acknowledgement?

Acknowledgement is an important part of messaging system. This will be covered in the acknowledgement section.

Connector Metadata

Most connectors are providing metadata to let you extract technical details about the message, but also customize the outbound dispatching.

Generating Messages

To produce messages to a channel, you need to use the @Outgoing annotation. This annotation takes a single parameter: the name of the populated channel.

Generating messages synchronously

You can generate messages synchronously. In this case, the method is called for every request from the downstream:

@Outgoing("my-channel")
public Message<Integer> generateMessagesSynchronously() {
    return Message.of(counter.getAndIncrement());
}
Requests?

Reactive Messaging connects components to build a reactive stream. In a reactive stream, the emissions are controlled by the consumer (downstream) indicating to the publisher (upstream) how many items it can consume. With this protocol, the consumers are never flooded.

Generating messages using CompletionStage

You can also return a CompletionStage / CompletableFuture. In this case, Reactive Messaging waits until the CompletionStage gets completed before calling it again.

For instance, this signature is useful to poll messages from a source using an asynchronous client:

@Outgoing("my-channel")
public CompletionStage<Message<Price>> generateMessagesAsCompletionStage() {
    return asyncClient.poll()
        .thenApply(Message::of);
}

Generating messages using Uni

You can also return a Uni instance. In this case, Reactive Messaging waits until the Uni emits its item before calling it again.

This signature is useful when integrating asynchronous clients providing a Mutiny API.

@Outgoing("my-channel")
public Uni<Message<Integer>> generateMessagesAsync() {
    return Uni.createFrom().item(() -> Message.of(counter.getAndIncrement()));
}

Generating Reactive Streams of messages

Instead of producing the message one by one, you can return the stream directly. If you have a data source producing Reactive Streams Publisher (or sub-types), this is the signature you are looking for:

public Publisher<Message<String>> generateMessageStream() {
    Multi<String> multi = reactiveClient.getStream();
    return multi.map(Message::of);
}

In this case, the method is called once to retrieve the Publisher.

Generating Payloads

Instead of Message, you can produce payloads. In this case, Reactive Messaging produces a simple message from the payload using Message.of.

Generating payload synchronously

You can produce payloads synchronously. The framework calls the method upon request and create Messages around the produced payloads.

@Outgoing("my-channel")
public Integer generatePayloadsSynchronously() {
    return counter.getAndIncrement();
}

Generating payload using CompletionStage

You can also return CompletionStage or CompletableFuture. For example, if you have an asynchronous client returning CompletionStage, you can use it as follows, to poll the data one by one:

@Outgoing("my-channel")
public CompletionStage<Price> generatePayloadsAsCompletionStage() {
    return asyncClient.poll();
}

Generating payload by producing Unis

You can also return a Uni if you have a client using Mutiny types:

@Outgoing("my-channel")
public Uni<Integer> generatePayloadsAsync() {
    return Uni.createFrom().item(() -> counter.getAndIncrement());
}

Generating Reactive Streams of payloads

Finally, you can return a Publisher (or a sub-type):

@Outgoing("my-channel")
public Multi<String> generatePayloadsStream() {
    Multi<String> multi = reactiveClient.getStream();
    return multi;
}

In this case, Reactive Messaging calls the method only once to retrieve the Publisher.

Consuming Messages

To consume messages from a channel, you need to use the @Incoming annotation. This annotation takes a single parameter: the name of the consumed channel.

Because Messages must be acknowledged, consuming messages requires returning asynchronous results that would complete when the incoming message get acknowledged.

For example, you can receive the Message, process it and return the acknowledgement as result:

@Incoming("my-channel")
public CompletionStage<Void> consumeMessage(Message<Price> message) {
    handle(message.getPayload());
    return message.ack();
}

You can also return a Uni if you need to implement more complicated processing:

@Incoming("my-channel")
public Uni<Void> consumeMessageUni(Message<Price> message) {
    return Uni.createFrom().item(message)
        .onItem().invoke(m -> handle(m.getPayload()))
        .onItem().transformToUni(x -> Uni.createFrom().completionStage(message.ack()));
}

Consuming Payloads

Unlike consuming messages, consuming payloads support both synchronous and asynchronous consumption.

For example, you can consume a payload as follow:

@Incoming("my-channel")
public void consumePayload(Price payload) {
    // do something
}

In this case, you don’t need to deal with the acknowledgement yourself. The framework acknowledges the incoming message (that wrapped the payload) once your method returns successfully.

If you need to achieve asynchronous actions, you can return a CompletionStage or a Uni:

@Incoming("my-channel")
public CompletionStage<Void> consumePayloadCS(Price payload) {
    CompletionStage<Void> cs = handleAsync(payload);
    return cs;
}
@Incoming("my-channel")
public Uni<Void> consumePayloadUni(Price payload) {
    return Uni.createFrom().item(payload)
        .onItem().invoke(this::handle)
        .onItem().ignore().andContinueWithNull();
}

In these 2 cases, the framework acknowledges the incoming message when the returned construct gets completed.

Processing Messages

You can process Message both synchronously or asynchronously. This later case is useful when you need to execute an asynchronous action during your processing such as invoking a remote service.

Do process Messages synchronously uses:

@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<Integer> in) {
    return in.withPayload(Integer.toString(in.getPayload()));
}

This method transforms the int payload to a String, and wraps it into a Message.

Using Message.withX methods

You may be surprised by the usage of Message.withX methods. It allows metadata propagation as the metadata would be copied from the incoming message and so dispatched to the next method.

You can also process Messages asynchronously:

@Incoming("in")
@Outgoing("out")
public CompletionStage<Message<String>> processMessageCS(Message<Integer> in) {
    CompletionStage<String> cs = invokeService(in.getPayload());
    return cs.thenApply(in::withPayload);
}

Or using Mutiny:

@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageUni(Message<String> in) {
    return invokeService(in.getPayload())
        .map(in::withPayload);
}

In general, you want to create the new Message from the incoming one. It enables metadata propagation and post-acknowledgement. For this, use the withX method from the Message class returning a new Message instance but copy the content (metadata, ack/nack…​).

Processing payloads

If you don’t need to manipulate the envelope, you can process payload directly either synchronously or asynchronously:

@Incoming("in")
@Outgoing("out")
public String processPayload(int in) {
    return Integer.toString(in);
}
@Incoming("in")
@Outgoing("out")
public CompletionStage<String> processPayloadCS(int in) {
    return invokeService(in);
}
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayload(String in) {
    return invokeService(in);
}
What about metadata?

With these methods, the metadata are automatically propagated.

Processing streams

The previous processing method were taking single Message or payload. Sometimes you need more advanced manipulation. For this, SmallRye Reactive Messaging lets you process the stream of Message or the stream of payloads directly:

@Incoming("in")
@Outgoing("out")
public Multi<Message<String>> processMessageStream(Multi<Message<Integer>> stream) {
    return
        stream
            .onItem().transformToUni(message ->
            invokeService(message.getPayload())
                .onFailure().recoverWithItem("fallback")
                .onItem().transform(message::withPayload)
        )
            .concatenate();

}
@Incoming("in")
@Outgoing("out")
public Multi<String> processPayloadStream(Multi<Integer> stream) {
    return
        stream
            .onItem().transformToUni(payload ->
            invokeService(payload)
                .onFailure().recoverWithItem("fallback")
        )
            .concatenate();

}

You can receive either a (Reactive Streams) Publisher, a PublisherBuilder or (Mutiny) Multi. You can return any sub-class of Publisher or a Publisher directly.

These signatures do not support metadata propagation. In the case of a stream of Message, you need to propagate the metadata manually. In the case of a stream of payload, propagation is not supported, and incoming metadata are lost.

Skipping messages

Sometimes you receive a message and don’t want to produce an output message. To handle this, you have several choices:

  1. for method processing single message or payload, producing null would produced an ignored message (not forwarded)

  2. for method processing streams, you can generate an empty stream.

The next snippet illustrates the first approach:

// Skip when processing payload synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public String processPayload(String s) {
    if (s.equalsIgnoreCase("skip")) {
        return null;
    }
    return s.toUpperCase();
}

// Skip when processing message synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<String> m) {
    String s = m.getPayload();
    if (s.equalsIgnoreCase("skip")) {
        m.ack();
        return null;
    }
    return m.withPayload(s.toUpperCase());
}

// Skip when processing payload asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayloadAsync(String s) {
    if (s.equalsIgnoreCase("skip")) {
        // Important, you must not return `null`, but a `null` content
        return Uni.createFrom().nullItem();
    }
    return Uni.createFrom().item(s.toUpperCase());
}

// Skip when processing message asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageAsync(Message<String> m) {
    String s = m.getPayload();
    if (s.equalsIgnoreCase("skip")) {
        m.ack();
        return Uni.createFrom().nullItem();
    }
    return Uni.createFrom().item(m.withPayload(s.toUpperCase()));
}

The second approach consists in emitted an empty Multi (or Publisher):

@Incoming("in")
@Outgoing("out-1")
public Multi<String> processPayload(String s) {
    if (s.equalsIgnoreCase("skip")) {
        return Multi.createFrom().empty();
    }
    return Multi.createFrom().item(s.toUpperCase());
}

@Incoming("in")
@Outgoing("out-2")
public Multi<Message<String>> processMessage(Message<String> m) {
    String s = m.getPayload();
    if (s.equalsIgnoreCase("skip")) {
        return Multi.createFrom().empty();
    }
    return Multi.createFrom().item(m.withPayload(s.toUpperCase()));
}

@Incoming("in")
@Outgoing("out-3")
public Multi<String> processPayloadStream(Multi<String> stream) {
    return stream
        .select().where(s -> !s.equalsIgnoreCase("skip"))
        .onItem().transform(String::toUpperCase);
}

@Incoming("in")
@Outgoing("out-4")
public Multi<Message<String>> processMessageStream(Multi<Message<String>> stream) {
    return stream
        .select().where(m -> !m.getPayload().equalsIgnoreCase("skip"))
        .onItem().transform(m -> m.withPayload(m.getPayload().toUpperCase()));
}

Message Converters

SmallRye Reactive Messaging supports message converters, allowing to transform an incoming message into a version accepted by the method. If the incoming messages or payload does not match the invoked method’s expectation, SmallRye Reactive Messaging looks for a suitable converter. If found, it converts the incoming message with this converter.

Converters can have multiple purposes, but the main use case is about transforming the message’s payload:

@ApplicationScoped
public class MyConverter implements MessageConverter {
    @Override
    public boolean canConvert(Message<?> in, Type target) {
        // Checks whether this converter can be used to convert the incoming message into a message
        // containing a payload of the type `target`.
        return in.getPayload().getClass().equals(String.class)  && target.equals(Person.class);
    }

    @Override
    public Message<?> convert(Message<?> in, Type target) {
        // Convert the incoming message into the new message.
        // It's important to build the new message **from** the received one.
        return in.withPayload(new Person((String) in.getPayload()));
    }
}

To provide a converter, implement a bean exposing the MessageConverter interface. The canConvert method is called during the lookup and verifies if it can handle the conversion. The target type is the expected payload type. If the converter returns true to canConvert, SmallRye Reactive Messaging calls the convert method to proceed to the conversion.

The previous converter can be used in application like the following, to convert Message<String> to Message<Person>:

@Outgoing("persons")
public Multi<String> source() {
    return Multi.createFrom().items("Neo", "Morpheus", "Trinity");
}

// The messages need to be converted as they are emitted as Message<String>
// and consumed as Message<Person>
@Incoming("persons")
public void consume(Person p) {
    // ...
}

Converters work for all supported method signatures. However, the signature must be well-formed to allow the extraction of the expected payload type. Wildcards and raw types do not support conversion. If the expected payload type cannot be extracted, or no converter fits, the message is passed as received.

If multiple suitable converters are present, implementations should override the getPriority method returning the priority. The default priority is 100. The converter lookup invokes converters with higher priority first.