Skip to content

Development Model

Reactive Messaging proposes a CDI-based programming model to implement event-driven applications. Following the CDI principles, beans are forming the main building block of your application. Reactive Messaging provides a set of annotations and types to implement beans that generate, consume or process messages.

@Incoming and @Outgoing

Reactive Messaging provides two main annotations:

These annotations are used on methods:

package beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

    @Incoming("consumed-channel")
    @Outgoing("populated-channel")
    public Message<String> process(Message<String> in) {
        // Process the payload
        String payload = in.getPayload().toUpperCase();
        // Create a new message from `in` and just update the payload
        return in.withPayload(payload);
    }
}

Note

Reactive Messaging beans can either be in the application scope (@ApplicationScoped) or dependent scope (@Dependent).

Manipulating messages can be cumbersome. When you are only interested in the payload, you can use the following syntax: The following code is equivalent to the snippet from above:

package beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class PayloadProcessingBean {

    @Incoming("consumed-channel")
    @Outgoing("populated-channel")
    public String process(String in) {
        return in.toUpperCase();
    }
}

Important

You should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

SmallRye Reactive Messaging automatically binds matching @Outgoing to @Incoming to form a chain:

A chain of components
A chain of components

If we consider the following code:

@Outgoing("source")
public Multi<String> generate() {
    return Multi.createFrom().items("Hello", "from", "reactive", "messaging");
}

@Incoming("source")
@Outgoing("sink")
public String process(String in) {
    return in.toUpperCase();
}

@Incoming("sink")
public void consume(String processed) {
    System.out.println(processed);
}

It would generate the following chain:

generate --> [ source ] --> process --> [ sink ] --> consume

Methods annotated with @Incoming or @Outgoing don’t have to be in the same bean (class). You can distribute them among a set of beans. Remote interactions are also possible when using connectors.

Methods annotated with:

  • only @Outgoing are used to generate messages or payloads
  • only @Incoming are used to consume messages or payloads
  • both @Incoming and @Outgoing are used to process messages or payloads; or transform the stream

Creating messages

Messages are envelopes around payload. They are the vehicle. While manipulating payload is convenient, messages let you add metadata, handle acknowledgement...

Creating Messages is done using the Message interface directly:

// Create a simple message wrapping a payload
Message<Price> m1 = Message.of(price);

// Create a message with metadata
Message<Price> m2 = Message.of(price, Metadata.of(new PriceMetadata()));

// Create a message with several metadata
Message<Price> m3 = Message.of(price,
        Metadata.of(new PriceMetadata(), new MyMetadata()));

// Create a message with an acknowledgement callback
Message<Price> m4 = Message.of(price, () -> {
    // Called when the message is acknowledged by the next consumer.
    return CompletableFuture.completedFuture(null);
});

// Create a message with both metadata and acknowledgement callback
Message<Price> m5 = Message.of(price,
        Metadata.of(new PriceMetadata()),
        () -> {
            // Called when the message is acknowledged by the next consumer.
            return CompletableFuture.completedFuture(null);
        });

Messages accept null as payload. Channels connected to outbound connectors interpret messages with null payload differently depending on the technology.

// Create a message with null payload
Message<Price> m6 = Message.of(null, Metadata.of(new PriceMetadata()));

You can also create new instance of Message from an existing one:

// Create a new message with a new payload but with the same metadata
Message<Price> m1 = message.withPayload(new Price(12.4));

// Create a new message with a new payload and add another metadata
Message<Price> m2 = message
        .withPayload(new Price(15.0))
        .withMetadata(Metadata.of(new PriceMetadata()));

// Create a new message with a new payload and a custom acknowledgement
Message<Price> m3 = message
        .withPayload(new Price(15.0))
        .withAck(() ->
        // acknowledge the incoming message
        message.ack()
                .thenAccept(x -> {
                    // do something
                }));

Acknowledgement?

Acknowledgement is an important part of messaging systems. This will be covered in the acknowledgement section.

Connector Metadata

Most connectors are providing metadata to let you extract technical details about the message, but also customize the outbound dispatching.

Messages vs. Payloads

Reactive messaging offers flexibility when it comes to handling messages and their acknowledgements. The application developer can choose to finely handle acknowledgements per-message basis, by handling the Message-based signatures. Otherwise, when handling payloads, acknowledgements (and negative-acknowledgements) are handled by the framework. The following sections in this documentation detail both development models.

While being the easier development model, in the past using payload-based signatures did not allow associating connector-specific metadata, making the Message-based signatures the de-facto choice even for the most common scenarios. This lead to using the connector custom-message implementation types, such as IncomingKafkaRecord, KafkaRecord or IncomingRabbitMQMessage, as a convenience for accessing connector-specific metadata.

Not only this forces to handle acknowledgements manually, it also doesn't allow for incoming or outgoing Messages to be intercepted or observed.

Custom Message types & Message interception

Custom Message types such as IncomingKafkaRecord, KafkaRecord or IncomingRabbitMQMessage are not compatible with features intercepting messages. Therefore, it is no longer recommended to use custom Message implementations in consumptions methods. Instead, you can either use the generic Message type and access specific metadata, or use the payload with metadata injection

Since incoming metadata injection and generic payloads features added to SmallRye Reactive Messaging, it is easier to use payload signatures and benefit from acknowledgement handling and still access connector-specific metadata.

Generating Messages

To produce messages to a channel, you need to use the @Outgoing annotation. This annotation takes a single parameter: the name of the populated channel.

Generating messages synchronously

You can generate messages synchronously. In this case, the method is called for every request from the downstream:

1
2
3
4
@Outgoing("my-channel")
public Message<Integer> generateMessagesSynchronously() {
    return Message.of(counter.getAndIncrement());
}

Requests?

Reactive Messaging connects components to build a reactive stream. In a reactive stream, the emissions are controlled by the consumer (downstream) indicating to the publisher (upstream) how many items it can consume. With this protocol, the consumers are never flooded.

Generating messages using CompletionStage

You can also return a CompletionStage / CompletableFuture. In this case, Reactive Messaging waits until the CompletionStage gets completed before calling it again.

For instance, this signature is useful to poll messages from a source using an asynchronous client:

1
2
3
4
5
@Outgoing("my-channel")
public CompletionStage<Message<Price>> generateMessagesAsCompletionStage() {
    return asyncClient.poll()
            .thenApply(Message::of);
}

Generating messages using Uni

You can also return a Uni instance. In this case, Reactive Messaging waits until the Uni emits its item before calling it again.

This signature is useful when integrating asynchronous clients providing a Mutiny API.

1
2
3
4
@Outgoing("my-channel")
public Uni<Message<Integer>> generateMessagesAsync() {
    return Uni.createFrom().item(() -> Message.of(counter.getAndIncrement()));
}

Generating Reactive Streams of messages

Instead of producing the message one by one, you can return the stream directly. If you have a data source producing Reactive Streams Publisher (or sub-types, such as Multi), this is the signature you are looking for:

1
2
3
4
public Flow.Publisher<Message<String>> generateMessageStream() {
    Multi<String> multi = reactiveClient.getStream();
    return multi.map(Message::of);
}

In this case, the method is called once to retrieve the Publisher.

Generating Payloads

Instead of Message, you can produce payloads. In this case, Reactive Messaging produces a simple message from the payload using Message.of.

Generating payload synchronously

You can produce payloads synchronously. The framework calls the method upon request and create Messages around the produced payloads.

1
2
3
4
@Outgoing("my-channel")
public Integer generatePayloadsSynchronously() {
    return counter.getAndIncrement();
}

Generating payload using CompletionStage

You can also return CompletionStage or CompletableFuture. For example, if you have an asynchronous client returning CompletionStage, you can use it as follows, to poll the data one by one:

1
2
3
4
@Outgoing("my-channel")
public CompletionStage<Price> generatePayloadsAsCompletionStage() {
    return asyncClient.poll();
}

Generating payload by producing Unis

You can also return a Uni if you have a client using Mutiny types:

1
2
3
4
@Outgoing("my-channel")
public Uni<Integer> generatePayloadsAsync() {
    return Uni.createFrom().item(() -> counter.getAndIncrement());
}

Generating Reactive Streams of payloads

Finally, you can return a Publisher (or a sub-type such as a Multi):

1
2
3
4
5
@Outgoing("my-channel")
public Multi<String> generatePayloadsStream() {
    Multi<String> multi = reactiveClient.getStream();
    return multi;
}

In this case, Reactive Messaging calls the method only once to retrieve the Publisher.

Consuming Messages

To consume messages from a channel, you need to use the @Incoming annotation. This annotation takes a single parameter: the name of the consumed channel.

Because Messages must be acknowledged, consuming messages requires returning asynchronous results that would complete when the incoming message get acknowledged.

For example, you can receive the Message, process it and return the acknowledgement as result:

1
2
3
4
5
6
@Incoming("my-channel")
public CompletionStage<Void> consumeMessage(Message<Price> message) {
    handle(message.getPayload());
    return message.ack();
}
'

You can also return a Uni if you need to implement more complicated processing:

1
2
3
4
5
6
@Incoming("my-channel")
public Uni<Void> consumeMessageUni(Message<Price> message) {
    return Uni.createFrom().item(message)
            .onItem().invoke(m -> handle(m.getPayload()))
            .onItem().transformToUni(x -> Uni.createFrom().completionStage(message.ack()));
}

Consuming Payloads

Unlike consuming messages, consuming payloads support both synchronous and asynchronous consumption.

For example, you can consume a payload as follows:

1
2
3
4
@Incoming("my-channel")
public void consumePayload(Price payload) {
    // do something
}

In this case, you don’t need to deal with the acknowledgement yourself. The framework acknowledges the incoming message (that wrapped the payload) once your method returns successfully.

If you need to achieve asynchronous actions, you can return a CompletionStage or a Uni:

1
2
3
4
5
@Incoming("my-channel")
public CompletionStage<Void> consumePayloadCS(Price payload) {
    CompletionStage<Void> cs = handleAsync(payload);
    return cs;
}
1
2
3
4
5
6
@Incoming("my-channel")
public Uni<Void> consumePayloadUni(Price payload) {
    return Uni.createFrom().item(payload)
            .onItem().invoke(this::handle)
            .onItem().ignore().andContinueWithNull();
}

In these 2 cases, the framework acknowledges the incoming message when the returned construct gets completed.

Processing Messages

You can process Message both synchronously or asynchronously. This later case is useful when you need to execute an asynchronous action during your processing such as invoking a remote service.

Do process Messages synchronously uses:

1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<Integer> in) {
    return in.withPayload(Integer.toString(in.getPayload()));
}

This method transforms the int payload to a String, and wraps it into a Message.

'''important "Using Message.withX methods" You may be surprised by the usage of Message.withX methods. It allows metadata propagation as the metadata would be copied from the incoming message and so dispatched to the next method.

You can also process Messages asynchronously:

1
2
3
4
5
6
@Incoming("in")
@Outgoing("out")
public CompletionStage<Message<String>> processMessageCS(Message<Integer> in) {
    CompletionStage<String> cs = invokeService(in.getPayload());
    return cs.thenApply(in::withPayload);
}

Or using Mutiny:

1
2
3
4
5
6
@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageUni(Message<String> in) {
    return invokeService(in.getPayload())
            .map(in::withPayload);
}

In general, you want to create the new Message from the incoming one. It enables metadata propagation and post-acknowledgement. For this, use the withX method from the Message class returning a new Message instance but copy the content (metadata, ack/nack...).

Processing payloads

If you don’t need to manipulate the envelope, you can process payload directly either synchronously or asynchronously:

1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public String processPayload(int in) {
    return Integer.toString(in);
}
1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public CompletionStage<String> processPayloadCS(int in) {
    return invokeService(in);
}
1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayload(String in) {
    return invokeService(in);
}

What about metadata?

With these methods, the metadata are automatically propagated.

Processing streams

The previous processing method were taking single Message or payload. Sometimes you need more advanced manipulation. For this, SmallRye Reactive Messaging lets you process the stream of Message or the stream of payloads directly:

@Incoming("in")
@Outgoing("out")
public Multi<Message<String>> processMessageStream(Multi<Message<Integer>> stream) {
    return stream
            .onItem().transformToUni(message -> invokeService(message.getPayload())
                    .onFailure().recoverWithItem("fallback")
                    .onItem().transform(message::withPayload))
            .concatenate();

}
1
2
3
4
5
6
7
8
9
@Incoming("in")
@Outgoing("out")
public Multi<String> processPayloadStream(Multi<Integer> stream) {
    return stream
            .onItem().transformToUni(payload -> invokeService(payload)
                    .onFailure().recoverWithItem("fallback"))
            .concatenate();

}

You can receive either a (Reactive Streams) Publisher, a PublisherBuilder or (Mutiny) Multi. You can return any subclass of Publisher or a Publisher directly.

Important

These signatures do not support metadata propagation. In the case of a stream of Message, you need to propagate the metadata manually. In the case of a stream of payload, propagation is not supported, and incoming metadata are lost.