Skip to content

How can I create a Multi from a non-reactive source?#

The UnicastProcessor is an implementation of Multi that lets you enqueue items in a queue.

The items are then dispatched to the subscriber using the request protocol. While this pattern is against the idea of back-pressure, it lets you connect sources of data that do not support back-pressure with your subscriber.

In the following example, the UnicastProcessor is used by a thread emitting items. These items are enqueued in the processor and replayed when the subscriber is connected, following the request protocol.

UnicastProcessor<String> processor = UnicastProcessor.create();
Multi<String> multi = processor
        .onItem().transform(String::toUpperCase)
        .onFailure().recoverWithItem("d'oh");

// Create a source of items that does not follow the request protocol
new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        processor.onNext(Integer.toString(i));
    }
    processor.onComplete();
}).start();

By default, the UnicastProcessor uses an unbounded queue. You can also pass a fixed size queue that would reject the items once full.