If you use Mutiny, there is a good chance you may want to avoid blocking the caller thread. In a pure reactive application, the application logic is executed on one of the few I/O threads, and blocking one of these would have dramatic consequences. So, here is the big question: how do you deal with blocking code?
Let’s imagine you have blocking code (e.g., connecting to a database using JDBC, reading a file from the file system…), and you want to integrate that into your reactive pipelines while avoiding blocking. You would need to isolate such blocking parts of your code and run these parts on worker threads.
Mutiny provides two operators to customize the threads used to handle events:
runSubscriptionOn- to configure the thread used to execute the code happening at subscription-time
emitOn- to configure the thread used to dispatch events downstream
It is very usual to deal with the blocking call during the subscription.
In this case, the
runSubscription operator is what you need:
Uni<String> uni = Uni.createFrom() .item(this::invokeRemoteServiceUsingBlockingIO) .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
The code above creates a Uni that will supply the item using a blocking call, here the
To avoid blocking the subscriber thread, it uses
runSubscriptionOn which switches the thread and call
invokeRemoteServiceUsingBlockingIO on another thread.
Here we pass the default worker thread pool, but you can use your own executor.
What’s that default worker pool?
In the previous snippet, you may wonder about
If the underlying platform does not provide a pool, a default one is used.
runSubscriptionOn does not subscribe to the Uni.
It specifies the executor to use when a subscription happens.
While the snippet above uses
Uni, you can also use
runSubscriptionOn on a
runSubscriptionOn works when the blocking operation happens at subscription time.
But, when dealing with
Multi and need to execute blocking operations for each item, you need to use
runSubscriptionOn runs the subscription on the given executor,
emitOn configures the executor used to propagate downstream the items, failure and completion events:
Multi<String> multi = Multi.createFrom().items("john", "jack", "sue") .emitOn(Infrastructure.getDefaultWorkerPool()) .onItem().transform(this::invokeRemoteServiceUsingBlockingIO);
emitOn is also available on