Skip to content

Pausable Channels

Based on reactive streams, Smallrye Reactive Messaging ensures that channels are back-pressured. This means that the flow of messages is controlled by the downstream consumer, whether it is a processing method or outgoing channel. Sometimes you may want to pause the flow of messages, for example, when the consumer is not ready to process them.

Injected @Channel streams are not subscribed to by default, so the flow of messages is controlled by the application. But for @Incoming methods, the flow of messages is controlled by the runtime.

Pausable channels are useful when you want to control the flow of messages within your application.

Creating a Pausable Channel

To use pausable channels, you need to activate it with the configuration property pausable set to true.

1
2
3
mp.messaging.incoming.my-channel.pausable=true
# optional, by default the channel is NOT paused initially
mp.messaging.outgoing.my-channel.initially-paused=true

Controlling the flow of messages

If a channel is configured to be pausable, you can get the PausableChannel by channel name from the ChannelRegistry programmatically, and pause or resume the channel as needed:

package pausable;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;

@ApplicationScoped
public class PausableController {

    @Inject
    ChannelRegistry registry;

    @PostConstruct
    public void resume() {
        // Wait for the application to be ready
        // Retrieve the pausable channel
        PausableChannel pausable = registry.getPausable("my-channel");
        // Pause the processing of the messages
        pausable.resume();
    }

    public void pause() {
        // Retrieve the pausable channel
        PausableChannel pausable = registry.getPausable("my-channel");
        // Pause the processing of the messages
        pausable.pause();
    }

    @Incoming("my-channel")
    void process(String message) {
        // Process the message
    }

}

Warning

Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests.