Skip to content

Emitter and Channels

It is not rare to combine in a single application imperative parts (Jax-RS, regular CDI beans) and reactive parts (beans with @Incoming and @Outgoing annotations). In these case, it’s often required to send messages from the imperative part to the reactive part. In other words, send messages to channels handled by reactive messaging and how can you retrieve messages.

Emitter and @Channel

To send things (payload or Message) from imperative code to a specific channel you need to use:

  1. the org.eclipse.microprofile.reactive.messaging.Channel annotations
  2. the org.eclipse.microprofile.reactive.messaging.Emitter type

The @Channel lets you indicate to which channel you are going to send your payloads or messages. The Emitter is the object to use to send these payloads or messages.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
public class MyImperativeBean {

    @Inject
    @Channel("prices")
    Emitter<Double> emitter;

    // ...

    public void send(double d) {
        emitter.send(d);
    }
}

The Emitter class takes a type parameter. It’s the type of payload. Even if you want to send Messages, the type is the payload type.

Important

You must have a @Incoming("prices") somewhere in your application (meaning a method consuming messages transiting on the channel prices), or an outbound connector configured to manage the prices channel (mp.messaging.outgoing.prices...)

Sending payloads

Sending payloads is done as follows:

1
2
3
4
5
6
7
@Inject
@Channel("prices")
Emitter<Double> emitterForPrices;

public void send(double d) {
    emitterForPrices.send(d);
}

When sending a payload, the emitter returns a CompletionStage. This CompletionStage gets completed once the message created from the payload is acknowledged:

1
2
3
4
5
6
public void sendAndAwaitAcknowledgement(double d) {
    CompletionStage<Void> acked = emitterForPrices.send(d);
    // sending a payload returns a CompletionStage completed
    // when the message is acknowledged
    acked.toCompletableFuture().join();
}

If the processing fails, the CompletionStage gets completed exceptionally (with the reason of the nack).

Sending messages

You can also send Messages:

1
2
3
public void sendAsMessage(double d) {
    emitterForPrices.send(Message.of(d));
}

When sending a Message, the emitter does not return a CompletionStage, but you can pass the ack/nack callback, and be called when the message is acked/nacked.

public void sendAsMessageWithAck(double d) {
    emitterForPrices.send(Message.of(d, () -> {
        // Called when the message is acknowledged.
        return CompletableFuture.completedFuture(null);
    },
            reason -> {
                // Called when the message is acknowledged negatively.
                return CompletableFuture.completedFuture(null);
            }));
}

Sending messages also let you pass metadata.

public void sendAsMessageWithAckAndMetadata(double d) {
    MyMetadata metadata = new MyMetadata();
    emitterForPrices.send(Message.of(d, Metadata.of(metadata),
            () -> {
                // Called when the message is acknowledged.
                return CompletableFuture.completedFuture(null);
            },
            reason -> {
                // Called when the message is acknowledged negatively.
                return CompletableFuture.completedFuture(null);
            }));
}

Metadata can be used to propagate some context objects with the message.

Overflow management

When sending messages from imperative code to reactive code, you must be aware of back-pressure. Indeed, messages sent using the emitter and stored in a queue. If the consumer does not process the messages quickly enough, this queue can become a memory hog and you may even run out of memory.

To control what need to happen when the queue becomes out of control, use the OnOverflow annotation. @OnOverflow lets you configure:

  • the maximum size of the queue (default is 256)
  • what needs to happen when this size is reached (fail, drop...)
// Set the max size to 10 and fail if reached
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10)
@Inject
@Channel("channel")
Emitter<String> emitterWithBuffer;

// [DANGER ZONE] no limit
@OnOverflow(OnOverflow.Strategy.UNBOUNDED_BUFFER)
@Inject
@Channel("channel")
Emitter<String> danger;

// Drop the new messages if the size is reached
@OnOverflow(OnOverflow.Strategy.DROP)
@Inject
@Channel("channel")
Emitter<String> dropping;

// Drop the previously sent messages if the size is reached
@OnOverflow(OnOverflow.Strategy.LATEST)
@Inject
@Channel("channel")
Emitter<String> dropOldMessages;

The supported strategies are:

  • OnOverflow.Strategy.BUFFER - use a buffer to store the elements until they are consumed. If the buffer is full, a failure is propagated (and the thread using the emitted gets an exception)

  • OnOverflow.Strategy.UNBOUNDED_BUFFER - use an unbounded buffer to store the elements

  • OnOverflow.Strategy.DROP - drops the most recent value if the downstream can’t keep up. It means that new value emitted by the emitter are ignored.

  • OnOverflow.Strategy.FAIL - propagates a failure in case the downstream can’t keep up.

  • OnOverflow.Strategy.LATEST - keeps only the latest value, dropping any previous value if the downstream can’t keep up.

  • OnOverflow.Strategy.NONE - ignore the back-pressure signals letting the downstream consumer to implement a strategy.

Defensive emission

Having an emitter injected into your code does not guarantee that someone is ready to consume the message. For example, a subscriber may be connecting to a remote broker. If there are no subscribers, using the send method will throw an exception.

The emitter.hasRequests() method indicates that a subscriber subscribes to the channel and requested items. So, you can wrap your emission with:

1
2
3
if (emitter.hasRequests()) {
    emitter.send("hello");
}

If you use the OnOverflow.Strategy.DROP, you can use the send method even with no subscribers nor demands. The message will be nacked immediately.

Retrieving channels

You can use the @Channel annotation to inject in your bean the underlying stream. Note that in this case, you will be responsible for the subscription:

@Inject
@Channel("my-channel")
Multi<String> streamOfPayloads;

@Inject
@Channel("my-channel")
Multi<Message<String>> streamOfMessages;

@Inject
@Channel("my-channel")
Publisher<String> publisherOfPayloads;

@Inject
@Channel("my-channel")
Publisher<Message<String>> publisherOfMessages;

Important

You must have a @Outgoing("my-channel") somewhere in your application (meaning a method generating messages transiting on the channel my-channel), or an inbound connector configured to manage the prices channel (mp.messaging.incoming.prices...)

Injected channels merge all the matching outgoing - so if you have multiple @Outgoing("out"), @Inject @Channel("out") gets all the messages.

If your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If you injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgement and broadcasting.

Emitter and @Broadcast

When using an Emitter, you can now @Broadcast what is emitted to all subscribers.

Here is an example of emitting a price with two methods marked @Incoming to receive the broadcast:

@Inject
@Broadcast
@Channel("prices")
Emitter<Double> emitter;

public void emit(double d) {
    emitter.send(d);
}

@Incoming("prices")
public void handle(double d) {
    // Handle the new price
}

@Incoming("prices")
public void audit(double d) {
    // Audit the price change
}

For more details see @Broadcast documentation.

Mutiny Emitter

If you prefer to utilize Uni in all your code, there is now a MutinyEmitter that will return Uni<Void> instead of void.

1
2
3
4
5
6
7
@Inject
@Channel("prices")
MutinyEmitter<Double> emitter;

public Uni<Void> send(double d) {
    return emitter.send(d);
}

There’s also the ability to block on sending the event to the emitter. It will only return from the method when the event is acknowledged, or nacked, by the receiver:

1
2
3
public void sendAwait(double d) {
    emitter.sendAndAwait(d);
}

And if you don’t need to worry about the success or failure of sending an event, you can sendAndForget:

1
2
3
public Cancellable sendForget(double d) {
    return emitter.sendAndForget(d);
}

Custom Emitter Implementations

Experimental

Custom emitter implementations is an experimental feature.

Emitter and MutinyEmitter are two implementations of the emitter concept, where imperative code in your application can send messages to Reactive Messaging channels.

With EmitterFactory it is possible to provide custom implementations, and application facing emitter interfaces.

In the following example, the injectable custom emitter interface is CustomEmitter, and it is implemented by CustomEmitterImpl:

public interface CustomEmitter<T> extends EmitterType {

    <M extends Message<? extends T>> void sendAndForget(M msg);

}

public static class CustomEmitterImpl<T> implements CustomEmitter<T>, MessagePublisherProvider<Object> {

    Publisher<Message<?>> publisher;

    public CustomEmitterImpl(EmitterConfiguration configuration, long defaultBufferSize) {
        //... initialize emitter with configuration
    }

    @Override
    public Publisher<Message<?>> getPublisher() {
        return publisher;
    }

    @Override
    public <M extends Message<? extends T>> void sendAndForget(M msg) {
        //... send to stream
    }
}

Note that CustomEmitter interface extends EmitterType, which is a marker interface for discovering custom emitter types. Also, CustomEmitterImpl implements the MessagePublisherProvider, which is used by the framework to transform this emitter to a channel.

Then we need to provide an implementation of the EmitterFactory interface:

@EmitterFactoryFor(CustomEmitter.class)
@ApplicationScoped
public static class CustomEmitterFactory implements EmitterFactory<CustomEmitterImpl<Object>> {

    @Inject
    ChannelRegistry channelRegistry;

    @Override
    public CustomEmitterImpl<Object> createEmitter(EmitterConfiguration configuration, long defaultBufferSize) {
        return new CustomEmitterImpl<>(configuration, defaultBufferSize);
    }

    @Produces
    @Channel("") // Stream name is ignored during type-safe resolution
    <T> CustomEmitter<T> produce(InjectionPoint injectionPoint) {
        String channelName = ChannelProducer.getChannelName(injectionPoint);
        return channelRegistry.getEmitter(channelName, CustomEmitter.class);
    }
}

The CustomEmitterFactory is a CDI managed bean, which implements the EmitterFactory. It is qualified with EmitterFactoryFor annotation which is configured with the emitter interface CustomEmitter that this factory provides.

Smallrye Reactive Messaging discovers the emitter factory during the CDI deployment validation and verifies that custom emitters used by the application have corresponding emitter factories. It'll use the emitter factory to create the emitter implementation and will register the implementation into the ChannelRegistry.

Note that the CustomEmitterFactory also uses the ChannelRegistry and provides the custom emitter with @Produces.

Finally, the application can inject and use the CustomEmitter as a normal emitter channel:

@Inject
@Channel("custom-emitter-channel")
CustomEmitter<String> customEmitter;

//...

public void emitMessage() {
    customEmitter.sendAndForget(Message.of("a"));
    customEmitter.sendAndForget(Message.of("b"));
    customEmitter.sendAndForget(Message.of("c"));
}