Sending / Receiving messages from non-reactive code
It is not rare to combine in a single application imperative parts (Jax-RS, regular CDI beans) and reactive parts (beans with @Incoming
and @Outgoing
annotations).
In these case, it’s often required to send messages from the imperative part to the reactive part.
In other words, send messages to channels handled by reactive messaging and how can you retrieve messages.
Emitter and @Channel
To send things (payload or Message
) from imperative code to a specific channel you need to use:
-
the
org.eclipse.microprofile.reactive.messaging.Channel
annotations -
the
org.eclipse.microprofile.reactive.messaging.Emitter
type
The @Channel
lets you indicate to which channel you are going to send your payloads or messages.
The Emitter
is the object to use to send these payloads or messages.
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class MyImperativeBean {
@Inject @Channel("prices") Emitter<Double> emitter;
// ...
public void send(double d) {
emitter.send(d);
}
}
The Emitter
class takes a type parameter. It’s the type of payload.
Even if you want to send Messages
, the type is the payload type.
You must have a @Incoming("prices") somewhere in your application (meaning a method consuming messages transiting on the channel prices ), or an outbound connector configured to manage the prices channel (mp.messaging.outgoing.prices… )
|
Sending payloads
Sending payloads is done as follows:
@Inject @Channel("prices") Emitter<Double> emitterForPrices;
public void send(double d) {
emitterForPrices.send(d);
}
When sending a payload, the emitter returns a CompletionStage
.
This CompletionStage
gets completed once the message created from the payload is acknowledged:
public void sendAndAwaitAcknowledgement(double d) {
CompletionStage<Void> acked = emitterForPrices.send(d);
// sending a payload returns a CompletionStage completed
// when the message is acknowledged
acked.toCompletableFuture().join();
}
If the processing fails, the CompletionStage
gets completed exceptionally (with the reason of the nack).
Sending messages
You can also send Messages
:
public void sendAsMessage(double d) {
emitterForPrices.send(Message.of(d));
}
When sending a Message
, the emitter does not return a CompletionStage
, but you can pass the ack/nack callback, and be called when the message is acked/nacked.
public void sendAsMessageWithAck(double d) {
emitterForPrices.send(Message.of(d, () -> {
// Called when the message is acknowledged.
return CompletableFuture.completedFuture(null);
},
reason -> {
// Called when the message is acknowledged negatively.
return CompletableFuture.completedFuture(null);
}));
}
Sending messages also let you pass metadata.
public void sendAsMessageWithAckAndMetadata(double d) {
MyMetadata metadata = new MyMetadata();
emitterForPrices.send(Message.of(d, Metadata.of(metadata),
() -> {
// Called when the message is acknowledged.
return CompletableFuture.completedFuture(null);
},
reason -> {
// Called when the message is acknowledged negatively.
return CompletableFuture.completedFuture(null);
}));
}
Metadata can be used to propagate some context objects with the message.
Overflow management
When sending messages from imperative code to reactive code, you must be aware of back-pressure. Indeed, messages sent using the emitter and stored in a queue. If the consumer does not process the messages quickly enough, this queue can become a memory hog and you may even run out of memory.
To control what need to happen when the queue becomes out of control, use the org.eclipse.microprofile.reactive.messaging.OnOverflow
annotation.
@OnOverflow
lets you configure:
-
the maximum size of the queue (default is 256)
-
what need to happen when this size is reached (fail, drop…)
// Set the max size to 10 and fail if reached
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10)
@Inject @Channel("channel") Emitter<String> emitterWithBuffer;
// [DANGER ZONE] no limit
@OnOverflow(OnOverflow.Strategy.UNBOUNDED_BUFFER)
@Inject @Channel("channel") Emitter<String> danger;
// Drop the new messages if the size is reached
@OnOverflow(OnOverflow.Strategy.DROP)
@Inject @Channel("channel") Emitter<String> dropping;
// Drop the previously sent messages if the size is reached
@OnOverflow(OnOverflow.Strategy.LATEST)
@Inject @Channel("channel") Emitter<String> dropOldMessages;
The supported strategies are:
-
OnOverflow.Strategy.BUFFER
- use a buffer to store the elements until they are consumed. If the buffer is full, a failure is propagated (and the thread using the emitted gets an exception) -
OnOverflow.Strategy.UNBOUNDED_BUFFER
- use an unbounded buffer to store the elements -
OnOverflow.Strategy.DROP
- drops the most recent value if the downstream can’t keep up. It means that new value emitted by the emitter are ignored. -
OnOverflow.Strategy.FAIL
- propagates a failure in case the downstream can’t keep up. -
OnOverflow.Strategy.LATEST
- keeps only the latest value, dropping any previous value if the downstream can’t keep up. -
OnOverflow.Strategy.NONE
- ignore the back-pressure signals letting the downstream consumer to implement a strategy.
Retrieving channels
You can use the @Channel
annotation to inject in your bean the underlying stream.
Note that in this case, you will be responsible for the subscription:
@Inject @Channel("my-channel") Multi<String> streamOfPayloads;
@Inject @Channel("my-channel") Multi<Message<String>> streamOfMessages;
@Inject @Channel("my-channel") Publisher<String> publisherOfPayloads;
@Inject @Channel("my-channel") Publisher<Message<String>> publisherOfMessages;
You must have a @Outgoing("my-channel") somewhere in your application (meaning a method generating messages transiting on the channel my-channel ), or an inbound connector configured to manage the prices channel (mp.messaging.incoming.prices… )
|
Injected channels merge all the matching outgoing - so if you have multiple @Outgoing("out")
, @Inject @Channel("out")
gets all the messages.
If your injected channel receives payloads (Multi<T>
), it acknowledges the message automatically, and support multiple subscribers.
If you injected channel receives Message
(Multi<Message<T>>
), you will be responsible for the acknowledgement and broadcasting.
Emitter and @Broadcast
When using an Emitter
, you can now @Broadcast
what is emitted to all subscribers.
Here is an example of emitting a price with two methods marked @Incoming
to receive the broadcast:
@Inject
@Broadcast
@Channel("prices") Emitter<Double> emitter;
public void emit(double d) {
emitter.send(d);
}
@Incoming("prices")
public void handle(double d) {
// Handle the new price
}
@Incoming("prices")
public void audit(double d) {
// Audit the price change
}
For more details see @Broadcast documentation.
Mutiny Emitter
If you prefer to utilize Uni
in all your code,
there is now a MutinyEmitter
that will return Uni<Void>
instead of void
.
@Inject
@Channel("prices")
MutinyEmitter<Double> emitter;
public Uni<Void> send(double d) {
return emitter.send(d);
}
There’s also the ability to block on sending the event to the emitter. It will only return from the method when the event is acknowledged, or nacked, by the receiver:
public void sendAwait(double d) {
emitter.sendAndAwait(d);
}
And if you don’t need to worry about the success or failure of sending an event,
you can sendAndForget
:
public Cancellable sendForget(double d) {
return emitter.sendAndForget(d);
}