Sending / Receiving messages from non-reactive code

It is not rare to combine in a single application imperative parts (Jax-RS, regular CDI beans) and reactive parts (beans with @Incoming and @Outgoing annotations). In these case, it’s often required to send messages from the imperative part to the reactive part. In other words, send messages to channels handled by reactive messaging and how can you retrieve messages.

Emitter and @Channel

To send things (payload or Message) from imperative code to a specific channel you need to use:

The @Channel lets you indicate to which channel you are going to send your payloads or messages. The Emitter is the object to use to send these payloads or messages.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MyImperativeBean {

    @Inject @Channel("prices") Emitter<Double> emitter;

    // ...

    public void send(double d) {
        emitter.send(d);
    }
}

The Emitter class takes a type parameter. It’s the type of payload. Even if you want to send Messages, the type is the payload type.

You must have a @Incoming("prices") somewhere in your application (meaning a method consuming messages transiting on the channel prices), or an outbound connector configured to manage the prices channel (mp.messaging.outgoing.prices…​)

Sending payloads

Sending payloads is done as follows:

@Inject @Channel("prices") Emitter<Double> emitterForPrices;

public void send(double d) {
    emitterForPrices.send(d);
}

When sending a payload, the emitter returns a CompletionStage. This CompletionStage gets completed once the message created from the payload is acknowledged:

public void sendAndAwaitAcknowledgement(double d) {
    CompletionStage<Void> acked = emitterForPrices.send(d);
    // sending a payload returns a CompletionStage completed
    // when the message is acknowledged
    acked.toCompletableFuture().join();
}

If the processing fails, the CompletionStage gets completed exceptionally (with the reason of the nack).

Sending messages

You can also send Messages:

public void sendAsMessage(double d) {
    emitterForPrices.send(Message.of(d));
}

When sending a Message, the emitter does not return a CompletionStage, but you can pass the ack/nack callback, and be called when the message is acked/nacked.

public void sendAsMessageWithAck(double d) {
    emitterForPrices.send(Message.of(d, () -> {
            // Called when the message is acknowledged.
            return CompletableFuture.completedFuture(null);
        },
        reason -> {
            // Called when the message is acknowledged negatively.
            return CompletableFuture.completedFuture(null);
        }));
}

Sending messages also let you pass metadata.

public void sendAsMessageWithAckAndMetadata(double d) {
    MyMetadata metadata = new MyMetadata();
    emitterForPrices.send(Message.of(d, Metadata.of(metadata),
        () -> {
            // Called when the message is acknowledged.
            return CompletableFuture.completedFuture(null);
        },
        reason -> {
            // Called when the message is acknowledged negatively.
            return CompletableFuture.completedFuture(null);
        }));
}

Metadata can be used to propagate some context objects with the message.

Overflow management

When sending messages from imperative code to reactive code, you must be aware of back-pressure. Indeed, messages sent using the emitter and stored in a queue. If the consumer does not process the messages quickly enough, this queue can become a memory hog and you may even run out of memory.

To control what need to happen when the queue becomes out of control, use the org.eclipse.microprofile.reactive.messaging.OnOverflow annotation. @OnOverflow lets you configure:

  • the maximum size of the queue (default is 256)

  • what need to happen when this size is reached (fail, drop…​)

// Set the max size to 10 and fail if reached
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10)
@Inject @Channel("channel") Emitter<String> emitterWithBuffer;

// [DANGER ZONE] no limit
@OnOverflow(OnOverflow.Strategy.UNBOUNDED_BUFFER)
@Inject @Channel("channel") Emitter<String> danger;

// Drop the new messages if the size is reached
@OnOverflow(OnOverflow.Strategy.DROP)
@Inject @Channel("channel") Emitter<String> dropping;

// Drop the previously sent messages if the size is reached
@OnOverflow(OnOverflow.Strategy.LATEST)
@Inject @Channel("channel") Emitter<String> dropOldMessages;

The supported strategies are:

  • OnOverflow.Strategy.BUFFER - use a buffer to store the elements until they are consumed. If the buffer is full, a failure is propagated (and the thread using the emitted gets an exception)

  • OnOverflow.Strategy.UNBOUNDED_BUFFER - use an unbounded buffer to store the elements

  • OnOverflow.Strategy.DROP - drops the most recent value if the downstream can’t keep up. It means that new value emitted by the emitter are ignored.

  • OnOverflow.Strategy.FAIL - propagates a failure in case the downstream can’t keep up.

  • OnOverflow.Strategy.LATEST - keeps only the latest value, dropping any previous value if the downstream can’t keep up.

  • OnOverflow.Strategy.NONE - ignore the back-pressure signals letting the downstream consumer to implement a strategy.

Retrieving channels

You can use the @Channel annotation to inject in your bean the underlying stream. Note that in this case, you will be responsible for the subscription:

@Inject @Channel("my-channel") Multi<String> streamOfPayloads;

@Inject @Channel("my-channel") Multi<Message<String>> streamOfMessages;

@Inject @Channel("my-channel") Publisher<String> publisherOfPayloads;

@Inject @Channel("my-channel") Publisher<Message<String>> publisherOfMessages;
You must have a @Outgoing("my-channel") somewhere in your application (meaning a method generating messages transiting on the channel my-channel), or an inbound connector configured to manage the prices channel (mp.messaging.incoming.prices…​)

Injected channels merge all the matching outgoing - so if you have multiple @Outgoing("out"), @Inject @Channel("out") gets all the messages.

If your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If you injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgement and broadcasting.

Emitter and @Broadcast

When using an Emitter, you can now @Broadcast what is emitted to all subscribers.

Here is an example of emitting a price with two methods marked @Incoming to receive the broadcast:

@Inject
@Broadcast
@Channel("prices") Emitter<Double> emitter;

public void emit(double d) {
    emitter.send(d);
}

@Incoming("prices")
public void handle(double d) {
    // Handle the new price
}

@Incoming("prices")
public void audit(double d) {
    // Audit the price change
}

For more details see @Broadcast documentation.

Mutiny Emitter

If you prefer to utilize Uni in all your code, there is now a MutinyEmitter that will return Uni<Void> instead of void.

@Inject
@Channel("prices")
MutinyEmitter<Double> emitter;

public Uni<Void> send(double d) {
    return emitter.send(d);
}

There’s also the ability to block on sending the event to the emitter. It will only return from the method when the event is acknowledged, or nacked, by the receiver:

public void sendAwait(double d) {
    emitter.sendAndAwait(d);
}

And if you don’t need to worry about the success or failure of sending an event, you can sendAndForget:

public Cancellable sendForget(double d) {
    return emitter.sendAndForget(d);
}