Development Model and Annotations
Reactive Messaging proposes a CDI-based programming model to implement event-driven applications. Following the CDI principles, beans are forming the main building block of your application. Reactive Messaging provides a set of annotations and types to implement beans that generate, consume or process messages.
A brief overview of the annotations
Reactive Messaging provides two main annotations:
-
org.eclipse.microprofile.reactive.messaging.Incoming
- indicates the consumed channel -
org.eclipse.microprofile.reactive.messaging.Outgoing
- indicates the populated channel
These annotations are used on methods:
package beans;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("consumed-channel")
@Outgoing("populated-channel")
public Message<String> process(Message<String> in) {
// Process the payload
String payload = in.getPayload().toUpperCase();
// Create a new message from `in` and just update the payload
return in.withPayload(payload);
}
}
Reactive Messaging beans can either be in the application scope (@ApplicationScoped ) or dependent scope (@Dependent ).
|
Manipulating messages can be cumbersome. When you are only interested by the payload, you can use the following syntax: The following code is equivalent to the snippet from above:
package beans;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PayloadProcessingBean {
@Incoming("consumed-channel")
@Outgoing("populated-channel")
public String process(String in) {
return in.toUpperCase();
}
}
You should not call methods annotated with @Incoming and/or @Outgoing directly from your code.
They are invoked by the framework.
Having user code invoking them would not have the expected outcome.
|
SmallRye Reactive Messaging automatically binds matching @Outgoing
to @Incoming
to form a chain:
If we consider the following code:
@Outgoing("source")
public Multi<String> generate() {
return Multi.createFrom().items("Hello", "from", "reactive", "messaging");
}
@Incoming("source")
@Outgoing("sink")
public String process(String in) {
return in.toUpperCase();
}
@Incoming("sink")
public void consume(String processed) {
System.out.println(processed);
}
It would generate the following chain:
generate --> [ source ] --> process --> [ sink ] --> consume
Methods annotated with @Incoming
or @Outgoing
don’t have to be in the same bean (class). You can distribute them
among a set of beans. Remote interactions are also possible when using connectors.
Methods annotated with:
-
only
@Outgoing
are used to generate messages or payloads -
only
@Incoming
are used to consume messages or payloads -
both
@Incoming
and@Outgoing
are used to process messages or payloads; or transform the stream == Creating Messages
Messages are envelopes around payload. They are the vehicle. While manipulating payload is convenient, messages let you add metadata, handle acknowledgement…
Creating Messages
is done using the Message
interface directly:
// Create a simple message wrapping a payload
Message<Price> m1 = Message.of(price);
// Create a message with metadata
Message<Price> m2 = Message.of(price, Metadata.of(new PriceMetadata()));
// Create a message with several metadata
Message<Price> m3 = Message.of(price,
Metadata.of(new PriceMetadata(), new MyMetadata()));
// Create a message with an acknowledgement callback
Message<Price> m4 = Message.of(price, () -> {
// Called when the message is acknowledged by the next consumer.
return CompletableFuture.completedFuture(null);
});
// Create a message with both metadata and acknowledgement callback
Message<Price> m5 = Message.of(price,
Metadata.of(new PriceMetadata()),
() -> {
// Called when the message is acknowledged by the next consumer.
return CompletableFuture.completedFuture(null);
});
You can also create new instance of Message
from an existing one:
// Create a new message with a new payload but with the same metadata
Message<Price> m1 = message.withPayload(new Price(12.4));
// Create a new message with a new payload and add another metadata
Message<Price> m2 = message
.withPayload(new Price(15.0))
.withMetadata(Metadata.of(new PriceMetadata()));
// Create a new message with a new payload and a custom acknowledgement
Message<Price> m3 = message
.withPayload(new Price(15.0))
.withAck(() ->
// acknowledge the incoming message
message.ack()
.thenAccept(x -> {
// do something
}));
Acknowledgement?
Acknowledgement is an important part of messaging system. This will be covered in the acknowledgement section. |
Connector Metadata
Most connectors are providing metadata to let you extract technical details about the message, but also customize the outbound dispatching. |
Generating Messages
To produce messages to a channel, you need to use the @Outgoing
annotation.
This annotation takes a single parameter: the name of the populated channel.
Generating messages synchronously
You can generate messages synchronously. In this case, the method is called for every request from the downstream:
@Outgoing("my-channel")
public Message<Integer> generateMessagesSynchronously() {
return Message.of(counter.getAndIncrement());
}
Requests?
Reactive Messaging connects components to build a reactive stream. In a reactive stream, the emissions are controlled by the consumer (downstream) indicating to the publisher (upstream) how many items it can consume. With this protocol, the consumers are never flooded. |
Generating messages using CompletionStage
You can also return a CompletionStage
/ CompletableFuture
.
In this case, Reactive Messaging waits until the CompletionStage
gets completed before calling it again.
For instance, this signature is useful to poll messages from a source using an asynchronous client:
@Outgoing("my-channel")
public CompletionStage<Message<Price>> generateMessagesAsCompletionStage() {
return asyncClient.poll()
.thenApply(Message::of);
}
Generating messages using Uni
You can also return a Uni instance.
In this case, Reactive Messaging waits until the Uni
emits its item before calling it again.
This signature is useful when integrating asynchronous clients providing a Mutiny API.
@Outgoing("my-channel")
public Uni<Message<Integer>> generateMessagesAsync() {
return Uni.createFrom().item(() -> Message.of(counter.getAndIncrement()));
}
Generating Reactive Streams of messages
Instead of producing the message one by one, you can return the stream directly.
If you have a data source producing Reactive Streams Publisher
(or sub-types), this is the signature you are looking for:
public Publisher<Message<String>> generateMessageStream() {
Multi<String> multi = reactiveClient.getStream();
return multi.map(Message::of);
}
In this case, the method is called once to retrieve the Publisher
.
Generating Payloads
Instead of Message
, you can produce payloads.
In this case, Reactive Messaging produces a simple message from the payload using Message.of
.
Generating payload synchronously
You can produce payloads synchronously.
The framework calls the method upon request and create Messages
around the produced payloads.
@Outgoing("my-channel")
public Integer generatePayloadsSynchronously() {
return counter.getAndIncrement();
}
Generating payload using CompletionStage
You can also return CompletionStage
or CompletableFuture
.
For example, if you have an asynchronous client returning CompletionStage
, you can use it as follows, to poll the data one by one:
@Outgoing("my-channel")
public CompletionStage<Price> generatePayloadsAsCompletionStage() {
return asyncClient.poll();
}
Generating payload by producing Unis
You can also return a Uni
if you have a client using Mutiny types:
@Outgoing("my-channel")
public Uni<Integer> generatePayloadsAsync() {
return Uni.createFrom().item(() -> counter.getAndIncrement());
}
Generating Reactive Streams of payloads
Finally, you can return a Publisher
(or a sub-type):
@Outgoing("my-channel")
public Multi<String> generatePayloadsStream() {
Multi<String> multi = reactiveClient.getStream();
return multi;
}
In this case, Reactive Messaging calls the method only once to retrieve the Publisher
.
Consuming Messages
To consume messages from a channel, you need to use the @Incoming
annotation.
This annotation takes a single parameter: the name of the consumed channel.
Because Messages
must be acknowledged, consuming messages requires returning asynchronous results that would complete when the incoming message get acknowledged.
For example, you can receive the Message
, process it and return the acknowledgement as result:
@Incoming("my-channel")
public CompletionStage<Void> consumeMessage(Message<Price> message) {
handle(message.getPayload());
return message.ack();
}
You can also return a Uni
if you need to implement more complicated processing:
@Incoming("my-channel")
public Uni<Void> consumeMessageUni(Message<Price> message) {
return Uni.createFrom().item(message)
.onItem().invoke(m -> handle(m.getPayload()))
.onItem().transformToUni(x -> Uni.createFrom().completionStage(message.ack()));
}
Consuming Payloads
Unlike consuming messages, consuming payloads support both synchronous and asynchronous consumption.
For example, you can consume a payload as follow:
@Incoming("my-channel")
public void consumePayload(Price payload) {
// do something
}
In this case, you don’t need to deal with the acknowledgement yourself. The framework acknowledges the incoming message (that wrapped the payload) once your method returns successfully.
If you need to achieve asynchronous actions, you can return a CompletionStage
or a Uni
:
@Incoming("my-channel")
public CompletionStage<Void> consumePayloadCS(Price payload) {
CompletionStage<Void> cs = handleAsync(payload);
return cs;
}
@Incoming("my-channel")
public Uni<Void> consumePayloadUni(Price payload) {
return Uni.createFrom().item(payload)
.onItem().invoke(this::handle)
.onItem().ignore().andContinueWithNull();
}
In these 2 cases, the framework acknowledges the incoming message when the returned construct gets completed.
Processing Messages
You can process Message
both synchronously or asynchronously.
This later case is useful when you need to execute an asynchronous action during your processing such as invoking a remote service.
Do process Messages
synchronously uses:
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<Integer> in) {
return in.withPayload(Integer.toString(in.getPayload()));
}
This method transforms the int
payload to a String
, and wraps it into a Message
.
Using
Message.withX methodsYou may be surprised by the usage of |
You can also process Messages
asynchronously:
@Incoming("in")
@Outgoing("out")
public CompletionStage<Message<String>> processMessageCS(Message<Integer> in) {
CompletionStage<String> cs = invokeService(in.getPayload());
return cs.thenApply(in::withPayload);
}
Or using Mutiny:
@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageUni(Message<String> in) {
return invokeService(in.getPayload())
.map(in::withPayload);
}
In general, you want to create the new Message
from the incoming one.
It enables metadata propagation and post-acknowledgement.
For this, use the withX
method from the Message
class returning a new Message
instance but copy the content (metadata, ack/nack…).
Processing payloads
If you don’t need to manipulate the envelope, you can process payload directly either synchronously or asynchronously:
@Incoming("in")
@Outgoing("out")
public String processPayload(int in) {
return Integer.toString(in);
}
@Incoming("in")
@Outgoing("out")
public CompletionStage<String> processPayloadCS(int in) {
return invokeService(in);
}
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayload(String in) {
return invokeService(in);
}
What about metadata?
With these methods, the metadata are automatically propagated. |
Processing streams
The previous processing method were taking single Message
or payload.
Sometimes you need more advanced manipulation.
For this, SmallRye Reactive Messaging lets you process the stream of Message
or the stream of payloads directly:
@Incoming("in")
@Outgoing("out")
public Multi<Message<String>> processMessageStream(Multi<Message<Integer>> stream) {
return
stream
.onItem().transformToUni(message ->
invokeService(message.getPayload())
.onFailure().recoverWithItem("fallback")
.onItem().transform(message::withPayload)
)
.concatenate();
}
@Incoming("in")
@Outgoing("out")
public Multi<String> processPayloadStream(Multi<Integer> stream) {
return
stream
.onItem().transformToUni(payload ->
invokeService(payload)
.onFailure().recoverWithItem("fallback")
)
.concatenate();
}
You can receive either a (Reactive Streams) Publisher
, a PublisherBuilder
or (Mutiny) Multi
.
You can return any sub-class of Publisher
or a Publisher
directly.
These signatures do not support metadata propagation.
In the case of a stream of |
Skipping messages
Sometimes you receive a message and don’t want to produce an output message. To handle this, you have several choices:
-
for method processing single message or payload, producing
null
would produced an ignored message (not forwarded) -
for method processing streams, you can generate an empty stream.
The next snippet illustrates the first approach:
// Skip when processing payload synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public String processPayload(String s) {
if (s.equalsIgnoreCase("skip")) {
return null;
}
return s.toUpperCase();
}
// Skip when processing message synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
m.ack();
return null;
}
return m.withPayload(s.toUpperCase());
}
// Skip when processing payload asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayloadAsync(String s) {
if (s.equalsIgnoreCase("skip")) {
// Important, you must not return `null`, but a `null` content
return Uni.createFrom().nullItem();
}
return Uni.createFrom().item(s.toUpperCase());
}
// Skip when processing message asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageAsync(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
m.ack();
return Uni.createFrom().nullItem();
}
return Uni.createFrom().item(m.withPayload(s.toUpperCase()));
}
The second approach consists in emitted an empty Multi
(or Publisher
):
@Incoming("in")
@Outgoing("out-1")
public Multi<String> processPayload(String s) {
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(s.toUpperCase());
}
@Incoming("in")
@Outgoing("out-2")
public Multi<Message<String>> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(m.withPayload(s.toUpperCase()));
}
@Incoming("in")
@Outgoing("out-3")
public Multi<String> processPayloadStream(Multi<String> stream) {
return stream
.select().where(s -> !s.equalsIgnoreCase("skip"))
.onItem().transform(String::toUpperCase);
}
@Incoming("in")
@Outgoing("out-4")
public Multi<Message<String>> processMessageStream(Multi<Message<String>> stream) {
return stream
.select().where(m -> !m.getPayload().equalsIgnoreCase("skip"))
.onItem().transform(m -> m.withPayload(m.getPayload().toUpperCase()));
}
Message Converters
SmallRye Reactive Messaging supports message converters, allowing to transform an incoming message into a version accepted by the method. If the incoming messages or payload does not match the invoked method’s expectation, SmallRye Reactive Messaging looks for a suitable converter. If found, it converts the incoming message with this converter.
Converters can have multiple purposes, but the main use case is about transforming the message’s payload:
@ApplicationScoped
public class MyConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
// Checks whether this converter can be used to convert the incoming message into a message
// containing a payload of the type `target`.
return in.getPayload().getClass().equals(String.class) && target.equals(Person.class);
}
@Override
public Message<?> convert(Message<?> in, Type target) {
// Convert the incoming message into the new message.
// It's important to build the new message **from** the received one.
return in.withPayload(new Person((String) in.getPayload()));
}
}
To provide a converter, implement a bean exposing the MessageConverter
interface.
The canConvert
method is called during the lookup and verifies if it can handle the conversion.
The target
type is the expected payload type.
If the converter returns true
to canConvert
, SmallRye Reactive Messaging calls the convert
method to proceed to the conversion.
The previous converter can be used in application like the following, to convert Message<String>
to Message<Person>
:
@Outgoing("persons")
public Multi<String> source() {
return Multi.createFrom().items("Neo", "Morpheus", "Trinity");
}
// The messages need to be converted as they are emitted as Message<String>
// and consumed as Message<Person>
@Incoming("persons")
public void consume(Person p) {
// ...
}
Converters work for all supported method signatures. However, the signature must be well-formed to allow the extraction of the expected payload type. Wildcards and raw types do not support conversion. If the expected payload type cannot be extracted, or no converter fits, the message is passed as received.
If multiple suitable converters are present, implementations should override the getPriority
method returning the priority.
The default priority is 100
.
The converter lookup invokes converters with higher priority first.