Skip to content

How can I create a Multi from a non-reactive source?#

The UnicastProcessor is an implementation of Multi that lets you enqueue items in a queue.

The items are then dispatched to the subscriber using the request protocol. While this pattern is against the idea of back-pressure, it lets you connect sources of data that do not support back-pressure with your subscriber.

In the following example, the UnicastProcessor is used by a thread emitting items. These items are enqueued in the processor and replayed when the subscriber is connected, following the request protocol.

UnicastProcessor<String> processor = UnicastProcessor.create();
Multi<String> multi = processor

// Create a source of items that does not follow the request protocol
new Thread(() -> {
    for (int i = 0; i < 1000; i++) {

By default, the UnicastProcessor uses an unbounded queue. You can also pass a fixed size queue that would reject the items once full.