SmallRye Mutiny is a reactive programming library. Wait? Another one? Yes!

Mutiny is designed after having experienced many issues with other Reactive programming libraries and having seen many developers lost in an endless sequence of flatMap. Mutiny takes a different approach. First, Mutiny does not provide as many operators as the other famous libraries, focusing instead on the most used operators. Furthermore, Mutiny provides a more guided API, which avoids having classes with hundreds of methods that cause trouble for even the smartest IDE. Finally, Mutiny has built-in converters from and to other reactive programing libraries, so you can always pivot.

Having questions?

You can contact the team on the SmallRye Google Group

Before digging into Mutiny, we need to understand how it relates to the other reactive thingies.

mutiny in landscape

Mutiny covers the yellow part of the landscape: it’s an event-driven reactive programming library, supporting (Reactive Streams) based back-pressure. It reuses ideas from Reactive eXtensions but does not follow the same API guidelines and operators. Also, it can be used to build Reactive Systems, for example, by combining, Mutiny, and Reactive Messaging.

1. Getting Started

1.1. Prerequisites

Mutiny runs on top of Java 8 and above. Mutiny implements the Reactive Streams specification, so depends on org.reactivestreams:reactive-streams:$reactive.streams.version.

1.2. Dependencies

To use Mutiny, you just need the io.smallrye.reactive:mutiny artifact in your CLASSPATH. If you are using Maven, declare the following dependency in your pom.xml:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny</artifactId>
    <version>0.5.4</version>
</dependency>

If you are using Gradle, declare the following dependency in your project:

dependencies {
     implementation 'io.smallrye.reactive:mutiny:0.5.4'
}

1.3. Hello World!

Now, you can create a class with a public static void main(String…​ args) entry point with the following content:

import io.smallrye.mutiny.Multi;

public class GettingStarted {

    public static void main(String[] args) {
        Multi.createFrom().items("hello", "world")
                .onItem().apply(s -> s.toUpperCase() + " ")
                .onCompletion().continueWith("!")
                .subscribe().with(System.out::print);
    }

}

This code prints HELLO WORLD !. Let’s explain it a bit. First it creates an instance of Multi. A Multi is an object that emit events. In Reactive lingo, it’s a Publisher. Here, this Multi is emitting two item events hello and world. The onItem and onCompletion lines are processing respectively the item and completion events emitted by the upstream. As a result, they also return instances of Multi which depending on the events received from upstream would emit events downstream.

In our example, every time the Multi emits an item, we apply a method to transform this item to uppercase, and emits the resulting value. Then, when the upstream sends the completion event (no more items to emit), we produce a last item: ! and send it downstream. Finally, we subscribe to the resulting stream and for each emitted item, print it on the console.

If we represent this as a sequence diagram, we would have:

diag 36a779d2e869a0312e41e21ab27aab6a

It’s important to understand that items and completion (and failure) flow from upstream to downstream and on the way, these events are processed. You can visualize this as an assemble line. The raw material is emitted from the initial source. Each processing step takes the flowing events and process them. Finally, at the end of the line, the finalized items are pushed to the final consumer - the subscriber.

2. Mutiny philosophy and concepts

Mutiny is built on 3 main concepts:

  • event-driven - the API makes the event a first-class citizen

  • API navigability - based on the previous concept, the API is built around the type of events

  • single or multi items streams - Mutiny provides 2 types: Uni and Multi

2.1. Events

When you use Mutiny you design assembly lines in which events flow. Events can flow from upstream to downstream (from source to sinks), or some signals can swim upstream from the sinks to the source.

Events going from upstream to downstream are published by Publisher and consumed by Subscriber, which may produce events for their downstream, as illustrated by the following diagram:

diag 4e60926db2b100a45f7a3fafb6d1c2c5

An entity that is both a Publisher and a Subscriber is generally named a Processor.

Four types of events can flow in this direction:

  • subscribed - indicate that the upstream has taken into account the subscription

  • items - events containing some value

  • completion - event indicating that no more items will be emitted

  • failure - event indicating that a failure has been encountered, and no more items will be emitted

failure and completion are terminal events. Once they are emitted, no more items are emitted.

Three types of events flow in the opposite direction, i.e. from downstream to upstream:

  • subscription - event sent by a subscriber to indicate its interest for the events (such as items) emitted by upstream

  • requests - event sent by a subscriber indicating how many items event it is able to handle

  • cancellation - event sent by a subscriber to indicate no more events should be emitted

In a typical scenario, a subscriber:

  1. A subscriber subscribes to the upstream - the upstream receives the subscription subscription request, and when initialized sends the subscribed event to the subscriber

  2. The subscriber receives the subscribed event with a subscription used to emit the requests and cancellation events

  3. The subscriber sends a request event indicating how many items it can handle at the moment, it can indicate 1, n, or infinite.

  4. The publisher receiving the request event start emitting at most n item events to the subscriber

  5. The subscriber can decide at anytime to request more events, or cancel the subscription

diag c06139c2b31470fa367c3f84a6e51430

The request event is the cornerstone of the back-pressure protocol. A subscriber should not request more than what it is able to handle, and a publisher should not emit more items than the amount of request received.

The protocol presented in this section is the Reactive Streams protocol.

If no subscriber subscribes, no items would be emitted. More importantly, nothing will ever happen. If you program does not do anything, be sure you subscribed to the flow.

2.2. An event-driven API

Mutiny is an event-driven API. So for each types of event, there is an on associated method that let you handle this specific event. For example:

Multi<String> source = Multi.createFrom().items("a", "b", "c");

source
        .onItem().invoke(item -> System.out.println("Received item " + item))
        .onFailure().invoke(failure -> System.out.println("Failed with " + failure.getMessage()))
        .onCompletion().invoke(() -> System.out.println("Completed"))
        .on().subscribed(subscription -> System.out.println("We are subscribed!"))

        .on().cancellation(() -> System.out.println("Downstream has cancelled the interaction"))
        .on().request(n -> System.out.println("Downstream requested " + n + " items"))
        .subscribe().with(item -> {
        });

Of course, the methods presented in this snippet are not very interesting, although they are quite useful to trace what’s going on. Mutiny provides many method to transform items, or the streams, compose actions…​

The API is composed by groups. For example:

  • multi.onItem() provides the methods to process item events

  • multi.onFailure() provides the methods to handle failure and recover

  • multi.transform() provides the methods to transform the stream such as filtering or selecting items.

2.3. Uni and Multi

Mutiny is based on two types:

  • Multi - handle stream of 0..* items (potentially unbounded)

  • Uni - handle stream of 0..1 items

Both Uni and Multi are asynchronous types. They receive and fire events, at anytime.

You may wonder why we make this distinction. Uni does not need the complete ceremony presented above as the request does not make a lot of sense. The subscribe event expresses the interest and triggers the computation. Also, Uni can handle items having a null value (and has a specific method to handle this case). Multi does not allow it (because the Reactive Streams specification forbids it).

Also, having a Uni implementing Publisher would be a bit like having Optional implementing Iterable.

In other words, Uni:

  • can receive at most 1 item event, or a failure event

  • cannot receive a completion event (null in the case of 0 item)

  • cannot receive a request event

The following snippet shows how you can use Uni and Multi:

Multi.createFrom().items("a", "b", "c")
        .onItem().apply(i -> i.toUpperCase())
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

Uni.createFrom().item("a")
        .onItem().apply(i -> i.toUpperCase())
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

You can convert Unis to Multis and vice-versa:

Multi.createFrom().items("a", "b", "c")
        .onItem().apply(i -> i.toUpperCase())
        .toUni() // Convert the multi to uni, only "a" will be forwarded.
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

Uni.createFrom().item("a")
        .onItem().apply(i -> i.toUpperCase())
        .toMulti() // Convert the uni to a multi, the completion event will be fired after the emission of "a"
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

2.4. Creating and Subscribing to Unis

There are many ways to create instances of Unis:

// Creation from a known item, or computed at subscription time
Uni.createFrom().item("some known value");
Uni.createFrom().item(() -> "some value computed at subscription time");

// Creation from a completion stage or completable future
Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> "result"))
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

// Creation from a failure
Uni.createFrom().failure(() -> new Exception("exception created at subscription time"));

// Creation from an emitter
Uni.createFrom().emitter(emitter -> {
    // ...
    emitter.complete("some result");
    //...
});

// Create from a Reactive Streams Publisher or a Multi
Uni.createFrom().publisher(Multi.createFrom().ticks().every(Duration.ofMillis(1)))
        .subscribe().with(
                item -> System.out.println("Received tick " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

// Defer the creation of the uni until subscription time
Uni.createFrom().deferred(() -> Uni.createFrom().item("create the uni at subscription time"));

Subscribing to Unis is done by method provided by uni.subscribe():

Uni<String> uni = Uni.createFrom().item("hello");

// Passing callbacks
Cancellable cancellable = uni.subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got a failure " + failure.getMessage()));
// You can use the returned `cancellation` to cancel the computation.
cancellable.cancel();

uni.subscribe().withSubscriber(new UniSubscriber<String>() {
    @Override
    public void onSubscribe(UniSubscription subscription) {
        System.out.println("Got the subscription: " + subscription);
    }

    @Override
    public void onItem(String item) {
        System.out.println("Got the item: " + item);
    }

    @Override
    public void onFailure(Throwable failure) {
        System.out.println("Got the failure: " + failure);
    }
});

2.5. Creating and Subscribing to Multis

There are many ways to create instances of Multis:

// Creation from a known item(s), or computed at subscription time
Multi.createFrom().item("some known value");
Multi.createFrom().item(() -> "some value computed at subscription time");
Multi.createFrom().items("a", "b", "c");
Multi.createFrom().items(() -> Stream.of("computed", "at", "subscription", "time"));
Multi.createFrom().iterable(Arrays.asList("some", "produceIterable"));

// Creation from a completion stage or completable future
Multi.createFrom().completionStage(CompletableFuture.supplyAsync(() -> "result"))
        .subscribe().with(
                item -> System.out.println("Received: " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

// Creation from a failure
Multi.createFrom().failure(() -> new Exception("exception created at subscription time"));

// Creation from an emitter
Multi.createFrom().emitter(emitter -> {
    // ...
    emitter.emit("a");
    emitter.emit("b");
    emitter.complete();
    //...
});

// Create from a Reactive Streams Publisher or a Multi
Multi.createFrom().publisher(Multi.createFrom().ticks().every(Duration.ofMillis(1)))
        .transform().byTakingFirstItems(2)
        .subscribe().with(
                item -> System.out.println("Received tick " + item),
                failure -> System.out.println("Failed with " + failure.getMessage()));

// Defer the creation of the uni until subscription time
Multi.createFrom().deferred(() -> Multi.createFrom().item("create the uni at subscription time"));

// Created from a Uni
Multi.createFrom().uni(Uni.createFrom().item("hello"));

// Created from periodic ticks
Multi.createFrom().ticks().every(Duration.ofMillis(1))
        .transform().byTakingFirstItems(2);

// Created from integer range
Multi.createFrom().range(1, 11);

Subscribing to Multis is done by method provided by multi.subscribe():

Multi<String> multi = Multi.createFrom().items("a", "b", "c");

Cancellable cancellable = multi.subscribe().with(
        item -> System.out.println("Got " + item));
// you can use the returned Cancellable to cancel the computation
cancellable.cancel();

cancellable = multi.subscribe().with(
        item -> System.out.println("Got " + item),
        failure -> System.out.println("Got a failure: " + failure),
        () -> System.out.println("Got the completion event"));

multi.subscribe().withSubscriber(new MultiSubscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Got subscription: " + s);
    }

    @Override
    public void onItem(String item) {
        System.out.println("Got an item: " + item);
    }

    @Override
    public void onFailure(Throwable failure) {
        System.out.println("Got a failure: " + failure);
    }

    @Override
    public void onCompletion() {
        System.out.println("Got the completion event");
    }
});

2.6. Structuring the pipeline

It can be hard to structure your sequence of processing in a readable manner. Mutiny provides the then function to structure your pipeline:

String result = Multi.createFrom().completionStage(CompletableFuture.supplyAsync(() -> 23))
        .then(self -> {
            // Transform each item into a string of the item +1
            return self
                    .onItem().apply(i -> i + 1)
                    .onItem().apply(i -> Integer.toString(i));
        })
        .then(self -> self
                .onItem().invoke(item -> System.out.println("The item is " + item))
                .collectItems().first())
        .then(self -> self.await().indefinitely());

then can be used on Multi and Uni and let you group a set of operations in a logical set. == How Do I do x?

2.7. How do I transform items?

To transform items synchronously use the apply operators:

String result1 = uni
        .onItem().apply(s -> s.toUpperCase())
        .await().indefinitely();
List<String> result2 = multi
        .onItem().apply(s -> s.toUpperCase())
        .collectItems().asList().await().indefinitely();

If your operation throws a checked exception, you can use the io.smallrye.mutiny.unchecked.Unchecked wrappers to avoid having to catch the exception in the callback.

String result1 = uni
        .onItem().apply(function(this::operationThrowingException))
        .await().indefinitely();
List<String> result2 = multi
        .onItem().apply(function(this::operationThrowingException))
        .collectItems().asList().await().indefinitely();

You can also transform items asynchronously using the produceUni / produceCompletionStage operators:

String result1 = uni
        .onItem().produceUni(s -> Uni.createFrom().item(s.toUpperCase()))
        .await().indefinitely();
String result2 = uni
        .onItem().produceCompletionStage(s -> CompletableFuture.supplyAsync(() -> s.toUpperCase()))
        .await().indefinitely();
List<String> result3 = multi
        .onItem().produceUni(s -> Uni.createFrom().item(s.toUpperCase())).concatenate()
        .collectItems().asList().await().indefinitely();
List<String> result4 = multi
        .onItem().produceCompletionStage(s -> CompletableFuture.supplyAsync(() -> s.toUpperCase()))
        .concatenate()
        .collectItems().asList().await().indefinitely();

If you need to generate a sequence of items use, producePublisher:

List<String> result = multi
        .onItem().producePublisher(s -> Multi.createFrom().item(s.toUpperCase())).concatenate()
        .collectItems().asList().await().indefinitely();
Unchecked

Mutiny provides a set of utility classes to handle functions / consumers / suppliers that throw checked exceptions. io.smallrye.mutiny.unchecked.Unchecked wraps code throwing checked exceptions into Java functions/suppliers/consumers. The try/catch is done for you and if an exception occurs, it gets rethrown automatically.

You can add the following import statement to simplify the usage of the provided methods: import static io.smallrye.mutiny.unchecked.Unchecked.*;

2.8. How do I filter/select items?

On Multi, you may need to filter / select items. To achieve this, there is a set of methods available in multi.transform():

List<Integer> list = multi
        .transform().byFilteringItemsWith(i -> i > 6)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .transform().byTestingItemsWith(i -> Uni.createFrom().item(i > 6))
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list = multi
        .transform().byTakingFirstItems(2)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .transform().byTakingItemsWhile(i -> i < 3)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list3 = multi
        .transform().byTakingLastItems(2)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list = multi
        .transform().bySkippingFirstItems(8)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .transform().bySkippingItemsWhile(i -> i < 9)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list3 = multi
        .transform().bySkippingLastItems(8)
        .collectItems().asList()
        .await().indefinitely();

You can also drop distinct items or drop repetitions:

List<Integer> list = multi
        .transform().byDroppingDuplicates()
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .transform().byDroppingRepetitions()
        .collectItems().asList()
        .await().indefinitely();

2.9. Where are my map, flatMap and concatMap methods?

If you are a seasoned reactive developer, you may miss the infamous map, flatMap, concatMap methods. Mutiny also proposes there methods using the most-used variants of them.

int result = uni
        .map(i -> i + 1)
        .await().indefinitely();

int result2 = uni
        .flatMap(i -> Uni.createFrom().item(i + 1))
        .await().indefinitely();

List<Integer> list = multi
        .map(i -> i + 1)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .flatMap(i -> Multi.createFrom().items(i, i))
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list3 = multi
        .concatMap(i -> Multi.createFrom().items(i, i))
        .collectItems().asList()
        .await().indefinitely();

Mutiny API is quite different from the traditional reactive eXtensions API. The equivalent to map, flatMap and concatMap are the following:

int result = uni
        .onItem().apply(i -> i + 1)
        .await().indefinitely();

int result2 = uni
        .onItem().produceUni(i -> Uni.createFrom().item(i + 1))
        .await().indefinitely();

List<Integer> list = multi
        .onItem().apply(i -> i + 1)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = multi
        .onItem().produceMulti(i -> Multi.createFrom().items(i, i)).merge()
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list3 = multi
        .onItem().produceMulti(i -> Multi.createFrom().items(i, i)).concatenate()
        .collectItems().asList()
        .await().indefinitely();

2.10. How do I merge, concatenate or combine Multis?

You can create instances of Multi by concatenating, merging, or combining Multis and Publishers:

List<Integer> list1 = Multi.createBy().merging().streams(multi1, multi2, multi3)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list2 = Multi.createBy().concatenating().streams(multi1, multi2, multi3)
        .collectItems().asList()
        .await().indefinitely();

List<Integer> list3 = Multi.createBy()
        .combining().streams(multi1, multi2, multi3).using((a, b, c) -> a + b + c)
        .collectItems().asList()
        .await().indefinitely();

2.11. How can I collect / accumulate results?

You can collect the items from a Multi using the method proposed by collectItems():

// collectItems let you collect items into a Uni of data structure.
// The final result is emitted when the completion event is received
Uni<List<Integer>> list = multi.collectItems().asList();
Uni<Map<String, Integer>> map = multi.collectItems().asMap(i -> Integer.toString(i));

// You can retrieve the first and last items
Uni<Integer> first = multi.collectItems().first();
Uni<Integer> last = multi.collectItems().last();

// you can also get a **blocking** iterable / streams
BlockingIterable<Integer> integers = multi.subscribe().asIterable();
Stream<Integer> stream = multi.subscribe().asStream();

You can also accumulate results using the scan method:

Multi<Integer> added = multi.onItem().scan(() -> 0, (item, acc) -> acc + item);

2.12. How do I represent asynchronous calls?

When calling a remote service, or anything asynchronous, you need a way to represent this call. Most of the time, this should be represented by a Uni<T> with T the type of result.

For example, a service producing a String should be represented as follows:

Uni<String> invokeMyService();

The returned Uni emits the result when the response is available. If the service invocation fails, the failure is propagated by the Uni.

If your service does not produce a result, use Uni<Void>:

Uni<Void> fireAndForget();

In this case, you still know if the invocation fails (you get a failure event) or if the invocation completes successfully (you get a Void item (null)).

2.13. How do I chain asynchronous calls?

To chain asynchronous calls, use onItem().produceUni or onItem().produceCompletionStage:

CompletableFuture<String> future = uni
        .onItem().produceUni(this::asyncOperation)
        .onItem().produceUni(this::anotherAsyncOperation)
        .subscribeAsCompletionStage();

2.14. How do I join asynchronous calls?

Joining asynchronous calls, i.e. being notified when multiple actions complete, is a common pattern. With Mutiny, you can achieve this as follows:

Uni<String> uni1 = getFirstUni();
Uni<String> uni2 = getSecondUni();
Uni<Integer> uni3 = getThirdUni();

Uni<Tuple3<String, String, Integer>> uni = Uni.combine().all().unis(uni1, uni2, uni3).asTuple();
String result  = uni.onItem().apply(tuple ->
        tuple.getItem1() + " " + tuple.getItem2() + " " + tuple.getItem3() + " !")
        .await().indefinitely();

In this example, the results are combined into a Tuple that contains the different results.

If you have many Unis to join, you can use an iterable and a combinator function:

Uni<String> uni1 = getFirstUni();
Uni<String> uni2 = getSecondUni();
Uni<Integer> uni3 = getThirdUni();

List<Uni<?>> list = Arrays.asList(uni1, uni2, uni3);

Uni<String> uni = Uni.combine().all().unis(list).combinedWith(results ->
        results.get(0) + " " + results.get(1) + " " + results.get(2) + " !"
);

The combinator function is invoked with a List<Object> as parameter and passes the different results. This function returns the combined result.

2.15. What’s the difference between merge() and concatenate()

Imagine you have a Multi, and for each item, you need to call a remote service or execute an asynchronous operation. To do this, you write the following code:

List<String> list = multi
    .onItem().produceUni(this::asyncService)
    // .concatenate() or .merge() ?

produceUni lets you choose between two merge methods: merge() and concatenate():

List<String> list = multi
        .onItem().produceUni(this::asyncService).merge()
        .collectItems().asList()
        .await().indefinitely();
List<String> list = multi
        .onItem().produceUni(this::asyncService).concatenate()
        .collectItems().asList()
        .await().indefinitely();

The difference between these alternatives is about the ordering. Let’s imagine you have the following items: a, b, c and the asyncService is just producing the uppercase version (but asynchronously, and you don’t know when the result is going to be produced).

concatenate() preserves the ordering. So, the output will always be A, B, C. merge() emits the resulting items as soon as they are available, and so may interleave the items. The output could be B, A, C if the computation of b was faster than for a.

merge() executes the asynchronous operation concurrently, while concatenate() executes them one by one to preserve the ordering. With merge(), you can also configure the degree of concurrency using: merge(concurrency):

List<String> list = multi
        .onItem().produceUni(this::asyncService).merge(8)
        .collectItems().asList()
        .await().indefinitely();

2.16. How do I recover from failure?

Failures are inherent to software. So you need to handle them. By default, they flow in the stream until they reach the final subscriber. But at any stage you can process the failure and transform or recover from it.

The following snippet shows how you can recover from a failure on a Uni:

// Transform a failure in another type of failure
CompletableFuture<String> res0 = uni.onFailure().apply(failure -> new MyBusinessException("oh no!"))
        .subscribeAsCompletionStage();

// Recover with an item
String res1 = uni
        .onFailure().recoverWithItem("hello")
        .await().indefinitely();

// Filter the type of failure
String res2 = uni
        .onFailure(IllegalArgumentException.class).recoverWithItem("bonjour")
        .onFailure(IOException.class).recoverWithItem("hello")
        .await().indefinitely();

// Recover recover with an uni
String res3 = uni
        .onFailure().recoverWithUni(() -> Uni.createFrom().item("fallback"))
        .await().indefinitely();

// Retry at most twice
CompletableFuture<String> res4 = uni
        .onFailure().retry().atMost(2)
        .subscribeAsCompletionStage();

Multis have the same API letting you recover from failure easily:

CompletableFuture<String> res0 = multi.onFailure().apply(failure -> new MyBusinessException("oh no!"))
        .collectItems().first()
        .subscribeAsCompletionStage();

String res1 = multi
        .onFailure().recoverWithItem("hello")
        .collectItems().first()
        .await().indefinitely();

String res2 = multi
        .onFailure(IllegalArgumentException.class).recoverWithItem("bonjour")
        .onFailure(IOException.class).recoverWithItem("hello")
        .collectItems().first()
        .await().indefinitely();

String res3 = multi
        .onFailure().recoverWithMulti(() -> Multi.createFrom().items("a", "b", "c"))
        .collectItems().first()
        .await().indefinitely();

CompletableFuture<String> res4 = multi
        .onFailure().retry().atMost(2)
        .collectItems().first()
        .subscribeAsCompletionStage();

Note that with Multi, a failure cancels the subscription, meaning you will not receive any more items. The retry operator lets you re-subscribe and continue the reception.

2.17. How do I handle null?

null is only accepted in the Uni type. uni.onItem().ifNull() lets you decide what you want to do when the received item is null:

uni.onItem().ifNull().continueWith("hello");
uni.onItem().ifNull().switchTo(() -> Uni.createFrom().item("hello"));
uni.onItem().ifNull().failWith(() -> new Exception("Boom!"));

The symmetric operation is also available with ifNotNull which let you handle the case where the item is not null:

uni
        .onItem().ifNotNull().apply(String::toUpperCase)
        .onItem().ifNull().continueWith("yolo!");
While supported, emitting null should be avoided except for Uni<Void>.

2.18. How do I handle timeout?

Uni are often used to represent asynchronous operations, like an HTTP call. So, it’s not rare to need to add a timeout on this kind of operation, and recover if the timeout occurs.

This can be done using Uni.ifNoItem():

String item = uni
        .ifNoItem().after(Duration.ofMillis(100)).recoverWithItem("some fallback item")
        .await().indefinitely();

On timeout, you can:

  • fail

  • recover with a specific item

  • continue with another alternative Uni

2.19. How do I control the threads?

Except indicated otherwise, the next processor is invoked on the thread emitting the event from upstream. You can switch to another thread using the emitOn operator. The emitOn operator lets you switch the thread used to dispatch (upstream to downstream) events, so items, failure and completion.

String res0 = uni.emitOn(executor)
        .onItem()
        .invoke(s -> System.out.println("Received item `" + s + "` on thread: " + Thread.currentThread().getName()))
        .await().indefinitely();

String res1 = multi.emitOn(executor)
        .onItem()
        .invoke(s -> System.out.println("Received item `" + s + "` on thread: " + Thread.currentThread().getName()))
        .collectItems().first()
        .await().indefinitely();

2.20. How do I interact with CompletionStages?

CompletionStage and CompletableFuture are classes provided by Java to represent asynchronous actions. However, they are not lazy, and do not follow a subscription patterns.

Mutiny lets you create Uni and Multi from instances of these classes. Also, Uni offers a way to retrieve the item or the failure as a CompletableFuture.

CompletableFuture<String> future1 = Uni
        // Create from a Completion Stage
        .createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"))
        .map(String::toUpperCase)
        .subscribeAsCompletionStage(); // Retrieve as a Completion Stage

CompletableFuture<List<String>> future2 = Multi
        .createFrom().completionStage(CompletableFuture.supplyAsync(() -> "hello"))
        .map(String::toUpperCase)
        .collectItems().asList() // Accumulate items in a list (return a Uni<List<T>>)
        .subscribeAsCompletionStage();// Retrieve the list as a Completion Stage

2.21. How do I interact with RX Java 2?

Mutiny allows converting Rx Java 2 types to/from Uni and Multi instances. To enable this feature, add the following dependency to your CLASSPATH:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-rxjava</artifactId>
    <version>0.5.4</version>
</dependency>

Then, you would need to import:

import io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.mutiny.converters.uni.UniRxConverters;

To create a Uni from RxJava 2 objects, use the following methods:

Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");

Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);
Uni<String> uniFromPublisher = Uni.createFrom().publisher(flowable);

When converting a Completable or an empty Maybe to a Uni, the Uni receives a null item. Use onItem().ifNull() to handle this case.

Note that, when converting an Observable or a Flowable to a Uni, only the first element is requested. Once received, the underlying subscription is cancelled.

Also note that you can create a Uni from a Reactive Streams Publisher using Uni.createFrom().publisher(…​).

To create Rx Java 2 types from a Uni, use the following methods:

Completable completable = uni.convert().with(UniRxConverters.toCompletable());
Single<Optional<String>> single = uni.convert().with(UniRxConverters.toSingle());
Single<String> single2 = uni.convert().with(UniRxConverters.toSingle().failOnNull());
Maybe<String> maybe = uni.convert().with(UniRxConverters.toMaybe());
Observable<String> observable = uni.convert().with(UniRxConverters.toObservable());
Flowable<String> flowable = uni.convert().with(UniRxConverters.toFlowable());

Because Uni can emit null, the default converter produces a Single<Optional<T>>. You can retrieve a Single<T> and fail instead using toSingle().failOnNull().

To create a Multi from RxJava 2 objects, use the following methods:

Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");

Multi<Void> multiFromCompletable = Multi.createFrom()
        .converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom()
        .converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);
Multi<String> multiFromPublisher = Multi.createFrom().publisher(flowable);

As for Uni, you can create a Multi from a Reactive Streams Publisher using Multi.createFrom().publisher(…​).

You can also create RxJava 2 objects from a Multi:

Completable completable = multi.convert().with(MultiRxConverters.toCompletable());
Single<Optional<String>> single = multi.convert().with(MultiRxConverters.toSingle());
Single<String> single2 = multi.convert().with(MultiRxConverters
        .toSingle().onEmptyThrow(() -> new Exception("D'oh!")));
Maybe<String> maybe = multi.convert().with(MultiRxConverters.toMaybe());
Observable<String> observable = multi.convert().with(MultiRxConverters.toObservable());
Flowable<String> flowable = multi.convert().with(MultiRxConverters.toFlowable());

2.22. How do I interact with Reactor?

Mutiny allows converting Project Reactor types to/from Uni and Multi instances. To enable this feature, add the following dependency to your CLASSPATH:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-reactor</artifactId>
    <version>0.5.4</version>
</dependency>

Then, you would need to import:

import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;

To create a Uni from Reactor objects, use the following methods:

Mono<Void> empty = Mono.empty();
Mono<String> mono = Mono.just("hello");
Flux<String> flux = Flux.just("a", "b", "c");

Uni<Void> uniFromEmptyMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), empty);
Uni<String> uniFromMono = Uni.createFrom().converter(UniReactorConverters.fromMono(), mono);
Uni<String> uniFromFlux = Uni.createFrom().converter(UniReactorConverters.fromFlux(), flux);
Uni<String> uniFromPublisher = Uni.createFrom().publisher(flux);

To create Reactor Flux and Mono instances from a Uni, use the following methods:

Mono<String> mono = uni.convert().with(UniReactorConverters.toMono());
Flux<String> flux = uni.convert().with(UniReactorConverters.toFlux());

To create a Multi from Reactor objects, use the following methods:

Mono<Void> empty = Mono.empty();
Mono<String> mono = Mono.just("hello");
Flux<String> flux = Flux.just("a", "b", "c");

Multi<Void> multiFromEmptyMono = Multi.createFrom()
        .converter(MultiReactorConverters.fromMono(), empty);
Multi<String> multiFromMono = Multi.createFrom().converter(MultiReactorConverters.fromMono(), mono);
Multi<String> multiFromFlux = Multi.createFrom().converter(MultiReactorConverters.fromFlux(), flux);
Multi<String> multiFromPublisher = Multi.createFrom().publisher(flux);

You can also create Reactor objects from a Multi:

Mono<String> mono = multi.convert().with(MultiReactorConverters.toMono());
Flux<String> flux = multi.convert().with(MultiReactorConverters.toFlux());

2.23. How do I control the back-pressure?

When a consumer cannot keep up with the pace of a publisher, there is an overflow condition. By default, it would fail and propagate an exception.

However, you can control how the overflow is handled using the multi.onOverflow() method, such as in:

String res1 = multi
        .emitOn(executor)
        .onOverflow().buffer(10)
        .collectItems().first()
        .await().indefinitely();

String res2 = multi
        .emitOn(executor)
        .onOverflow().dropPreviousItems()
        .collectItems().first()
        .await().indefinitely();

2.24. How do wrap blocking calls?

It is often the case that a source of data (web service, database…​) is synchronous and blocking. To deal with such sources in your applications, apply the following pattern:

Uni<String> blocking = Uni.createFrom().item(this::invokeRemoteServiceUsingBlockingIO)
        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

Create an Uni that will supply the item using a blocking call, here the invokeRemoteServiceUsingBlockingIO method. To avoid blocking the subscriber, use runSubscriptionOn which switch the thread and so call invokeRemoteServiceUsingBlockingIO on another thread. Here we pass the default worker thread pool, but you can use your own executor.

Note that runSubscriptionOn does not subscribe to the Uni. It specifies the executor to use when a subscribe call happens.

Using runSubscriptionOn works when the blocking operation happens at subscription time. But, when you are dealing with Multi and need to execute a blocking operation for each item, you need emitOn.

While runSubscriptionOn runs the subscription on the given executor, emitOn configures the executor used to propagate downstream the item, failure and completion events:

Multi<String> multi = Multi.createFrom().items("john", "jack", "sue")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .onItem().apply(this::invokeRemoteServiceUsingBlockingIO);

2.25. emitOn vs. runSubscriptionOn

The emitOn and runSubscriptionOn are 2 operators influencing on which threads the event are dispatched. However, they target different types of events and different directions.

emitOn takes events coming from upstream (items, completion, failure) and replays them downstream on a thread from the given executor. Consequently, it affects where the subsequent operators execute (until another emitOn is used):

Multi.createFrom().items(this::retrieveItemsFromSource)
        .emitOn(executor)
        .onItem().apply(this::applySomeOperation)
        .subscribe().with(
        item -> System.out.println("Item: " + item),
        Throwable::printStackTrace,
        () -> completed.set(true)
);

The previous code produces the following sequence:

diag ce11714f53fedf6861bf5f43e7f2d847

runSubscriptionOn applies to the subscription process. It requests the upstream to run its subscription (call of the subscribe method on its own upstream) on a thread from the given executor:

Multi.createFrom().items(() -> {
    // called on a thread from the executor
    return retrieveItemsFromSource();
})
        .onItem().apply(this::applySomeOperation)
        .runSubscriptionOn(executor)
        .subscribe().with(
        item -> System.out.println("Item: " + item),
        Throwable::printStackTrace,
        () -> completed.set(true)
);

So, if we consider the previous code snippet, it produces the following sequence:

diag cbd1972d7b4ab697a798e3d1b5ede99e

2.26. How can I use a Unicast Processor

The UnicastProcessor is an implementation of Multi that lets you enqueue items in a queue. The items are then dispatched to the subscriber using the request protocol. While this pattern is against the idea of back-pressure, it lets you connect sources of data that do not support back-pressure with your subscriber.

In the following example, the UnicastProcessor is used by a thread emitting items. These items are enqueued in the processor and replayed when the subscriber is connected, following the request protocol.

UnicastProcessor<String> processor = UnicastProcessor.create();
Multi<String> multi = processor
        .onItem().apply(String::toUpperCase)
        .onFailure().recoverWithItem("d'oh");

// Create a source of items that does not follow the request protocol
new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        processor.onNext(Integer.toString(i));
    }
    processor.onComplete();
}).start();

By default the UnicastProcessor uses an unbounded queue. You can also pass a fixed size queue that would reject the items once full.

2.27. How can I create Uni/Multi from a callback-based API

When dealing with async API using callback, you can create Unis or Multis by using emitters:

Uni<String> uni = Uni.createFrom().emitter(emitter -> {
    client.execute(ar -> {
        if (ar.failed()) {
            emitter.fail(ar.cause());
        } else {
            emitter.complete(ar.value());
        }
    });
});

Multi<String> multi = Multi.createFrom().emitter(emitter -> {
    client.onMessage(e -> {
        if (e != null) {
            emitter.emit(e);
        } else {
            emitter.complete();
        }
    });
});

Note that the callback receiving the emitter is called for every subscription.

2.28. How do I create hot streams

In a cold stream, the stream is created when one subscriber subscribes to the stream. So, if no one subscribes, the actual stream is not created, saving resources (that would be wasted because nobody is interested in the items).

In a hot stream, the stream exists before subscribers subscribe. The stream emits items even if no subscribers observe the stream. If there are no subscribers, the items are just dropped. Subscribers only get items emitted after their subscription, meaning that any previous items would not be received.

To create a hot stream, you can use io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor that:

  • drops items if no subscribers are present

  • forwards items to the set of observing subscribers

BroadcastProcessor<String> processor = BroadcastProcessor.create();
Multi<String> multi = processor
        .onItem().apply(String::toUpperCase)
        .onFailure().recoverWithItem("d'oh");

new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        processor.onNext(Integer.toString(i));
    }
    processor.onComplete();
}).start();

// Subscribers can subscribe at any time.
// They will only receive items emitted after their subscription.
// If the source is already terminated (by a completion or a failure signal)
// the subscriber receives this signal.

Note that the BroadcastProcessor subscribes to the hot source aggressively and without back-pressure. However, the BroadcastProcessor enforces the back-pressure protocol per subscriber. If a subscriber is not ready to handle an item emitted by the hot source, an io.smallrye.mutiny.subscription.BackPressureFailure is forwarded to this subscriber.

2.29. How do I handle polling?

There are many poll-based API around us. Sometimes you need to use these APIs to generate a stream from the polled values. To do this, use the repeat() feature:

PollableDataSource source = new PollableDataSource();
// First creates a uni that emit the polled item. Because `poll` blocks, let's use a specific executor
Uni<String> pollItemFromSource = Uni.createFrom().item(source::poll).runSubscriptionOn(executor);
// To get the stream of items, just repeat the uni indefinitely
Multi<String> stream = pollItemFromSource.repeat().indefinitely();

Cancellable cancellable = stream.subscribe().with(item -> System.out.println("Polled item: " + item));
// ... later ..
// when you don't want the items anymore, cancel the subscription and close the source if needed.
cancellable.cancel();
source.close();

You can also stop the repetition using the repeat().until() method which will continue the repetition until the given predicate returns true, and/or directly create a Multi using Multi.createBy().repeating():

PollableDataSource source = new PollableDataSource();
Multi<String> stream = Multi.createBy().repeating()
            .supplier(source::poll)
            .until(s -> s == null)
        .runSubscriptionOn(executor);

stream.subscribe().with(item -> System.out.println("Polled item: " + item));

2.30. How do I handle pagination?

There are many REST / HTTP APIs using pagination, i.e. return only a subset of the results and you need to request the next page to get the next batch. Each batch contains a list of item(s). To use this kind of API and generate a continuous stream of items, you need to use the Multi.createBy().repeating() function. However, we need to pass a cursor / state to advance and avoid requesting again and again the same page. Fortunately, repeating provides methods to pass a shared state. So by combining these methods and disjoint you can generate streams from these pages:

PaginatedApi api = new PaginatedApi();

Multi<String> stream = Multi.createBy().repeating()
        .completionStage(
                () -> new AtomicInteger(),
                state -> api.getPage(state.getAndIncrement()))
        .until(list -> list.isEmpty())
        .onItem().disjoint();

First, you create a Multi containing the items emitted by the CompletionStage supplier and pass a state supplier to progress among the pages.

Then, use until to call the paginated API until we have all the items. At the point we have a stream of list of item such as ["a", "b", "c"], ["d", "e"], []. However, we want the following stream: "a", "b", "c", "d", "e". The disjoint method does exactly this. It gets the items from the lists and passes them downstream:

diag 1cddf88909ef82bdaa96ca9da9f4c028

Multi.createBy().repeating() lets you choose the number of iterations using:

  • atMost - exact number of repetitions (or failure happens before reaching that number)

  • until - the repetition is stopped if the item emitted by the Uni passes a test (predicate). It does not propagate the item that did pass the check, and it stops the repetition. The check verifies if the current item does not contain valid data.

  • whilst - the repetition is stopped if the item emitted by the Uni does not pass a test (predicate). It does propagate the item downstream even if the check does not pass. However, it stops the repetition. The test verifies if there is a next batch to be retrieved.

The following code illustrates the usage of whilst:

PaginatedApi api = new PaginatedApi();

Multi<Page> stream = Multi.createBy().repeating()
        .uni(
                () -> new AtomicInteger(),
                state -> api.retrieve(state.getAndIncrement()))
        .whilst(page -> page.hasNext());

2.31. How do I delay things?

You can delay item emission using the Uni.onItem().delayIt().by(…​) or Uni.onItem().delayIt().until(…​). The first variant delays the emission by a given duration:

String delayed = Uni.createFrom().item("hello")
        .onItem().delayIt().by(Duration.ofMillis(10))
        .map(s -> "Delayed " + s)
        .await().indefinitely();

You can also delay the emission based on an asynchronous result:

String delayed = Uni.createFrom().item("hello")
        // The write method returns a Uni completed
        // when the operation is done.
        .onItem().delayIt().until(this::write)
        .map(s -> "Written " + s)
        .await().indefinitely();

The item is propagated downstream when the Uni returned by the function emits an item (possibly null). If the function emits a failure (or throws an exception), this failure is propagated downstream.

To delay items from a Multi, you can use the previously mentioned methods:

List<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
        .onItem().produceUni(i -> Uni.createFrom().item(i).onItem().delayIt().by(Duration.ofMillis(10)))
        .concatenate()
        .collectItems().asList()
        .await().indefinitely();

Generally, you want to introduce random delays. You can implement this behavior as follows:

Random random = new Random();
List<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
        .onItem().produceUni(i -> Uni.createFrom().item(i).onItem().delayIt().by(Duration.ofMillis(random.nextInt(100) + 1)))
        .merge()
        .collectItems().asList()
        .await().indefinitely();

2.32. Where do I find the JavaDoc?

The JavaDoc is published there.

2.33. Default executor and integration

Sometimes, Mutiny needs to execute tasks on other threads, such as monitoring time or delaying actions. Most operators relying on such capacity let you pass either a ScheduledExecutorService or an ExecutorService.

Mutiny uses the fork-join pool as default executor. A ScheduledExecutorService is also created but delegates the execution of the delayed/scheduled tasks to the default executor.

In the case you want to integrate Mutiny with a thread pool managed by a platform, you can configure it using Infrastructure.setDefaultExecutor() method.