Skip to content

How to change the emission thread?#

Except indicated otherwise, Mutiny invokes the next stage using the thread emitting the event from upstream. So, in the following code, the transform stage is invoked from the thread emitting the event.

Uni<String> uni = Uni.createFrom().<String>emitter(emitter ->
        new Thread(() ->
                emitter.complete("hello from "
                        + Thread.currentThread().getName())
        ).start()
)
        .onItem().transform(item -> {
            // Called on the emission thread.
            return item.toUpperCase();
        });

You can switch to another thread using the emitOn operator. The emitOn operator lets you switch the thread used to dispatch (upstream -> downstream) events, so items, failure and completion events. Just pass the executor you want to use.

String res0 = uni.emitOn(executor)
        .onItem()
        .invoke(s -> System.out.println("Received item `" + s + "` on thread: "
                + Thread.currentThread().getName()))
        .await().indefinitely();

String res1 = multi.emitOn(executor)
        .onItem()
        .invoke(s -> System.out.println("Received item `" + s + "` on thread: "
                + Thread.currentThread().getName()))
        .collect().first()
        .await().indefinitely();

Note

You cannot pass a specific thread, but you can implement a simple Executor dispatching on that specific thread, or use a single threaded executor.

Warning

Be careful as this operator can lead to concurrency problems with non thread-safe objects such as CDI request-scoped beans. It might also break reactive-streams semantics with items being emitted concurrently.