Skip to content

Quick start

Getting Mutiny Zero

You can get Mutiny Zero through the following Maven coordinates:

  • groupId: io.smallrye.reactive
  • artifactId: mutiny-zero

Mutiny Zero exposes publishers from the reactive streams library.

If you are interested in exposing JDK Flow publishers then you can use the jdk-flow classifier which is free from any dependency to the reactive streams library.

Creating publishers

Your main entry point in the Mutiny Zero API is the mutiny.zero.ZeroPublisher interface that exposes factory methods.

Creating from known values

If you already know the values to be emitted (or a failure), then you can use following factory methods:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package docsamples;

import static mutiny.zero.ZeroPublisher.*;

import java.io.IOException;
import java.util.Arrays;

import org.reactivestreams.Publisher;

public class FromKnownValues {

    public static void main(String[] args) {

        // From values
        Publisher<Integer> pub1 = fromItems(1, 2, 3);

        // From an iterable
        Publisher<Integer> pub2 = fromIterable(Arrays.asList(1, 2, 3));

        // From a failure
        Publisher<?> pub3 = fromFailure(new IOException("Broken pipe"));
    }
}

Creating from CompletionStage

CompletionStage is the Java SDK API for asynchronous operations that emit either a result or a failure.

Mutiny Zero can create a Publisher from a CompletionStage that emits exactly 1 item or a failure, then a completion signal:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package docsamples;

import static java.util.concurrent.CompletableFuture.supplyAsync;
import static mutiny.zero.ZeroPublisher.fromCompletionStage;

import org.reactivestreams.Publisher;

public class FromCompletionStage {

    public static void main(String[] args) {

        Publisher<Long> pun = fromCompletionStage(() -> supplyAsync(() -> 58L));
    }
}

Creating using the general-purpose Tube API

For all other cases you should use the mutiny.zero.Tube API.

A Tube represents an object you can interact with to emit values, error and completion. It is aware of cancellation and items requests made by the subscriber.

A Tube is a good abstraction if you want to pass events from an existing asynchronous I/O source as they arrive. Here is a not so fictional example where SampleAsyncSource (an asynchronous I/O API) has to be adapted to a Publisher:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package docsamples;

import static mutiny.zero.ZeroPublisher.create;

import org.reactivestreams.Publisher;

import mutiny.zero.BackpressureStrategy;
import mutiny.zero.TubeConfiguration;

public class FromTube {

    public static void main(String[] args) {

        SampleAsyncSource source = new SampleAsyncSource();

        TubeConfiguration configuration = new TubeConfiguration()
                .withBackpressureStrategy(BackpressureStrategy.BUFFER)
                .withBufferSize(256);

        Publisher<String> pub = create(configuration, tube -> {

            // Start
            source.start();

            // Allow items to be received
            tube.whenRequested(n -> source.resume());

            // Termination cases
            tube.whenCancelled(source::close);
            tube.whenTerminates(() -> System.out.println("Done"));

            // Emit items, pause the source if need be
            source.onItem(str -> {
                tube.send(str);
                if (tube.outstandingRequests() == 0L) {
                    source.pause();
                }
            });

            // Error
            source.onError(tube::fail);

            // Completion
            source.onEnd(tube::complete);

        });
    }
}

Since SampleAsyncSource does not support reactive streams but can be paused and resumed, the Tube API is used not just to send items but also to control SampleAsyncSource. The Tube also has a buffer of 256 items in case of overflow to cope with the fact that pausing SampleAsyncSource may not be immediate.

Several back-pressure strategies are offered by Tube:

  • buffer items (with or without a bound),
  • drop items,
  • signal an error,
  • ignore back-pressure and still send items,
  • keep only the last values in a fixed-size buffer.

Tube enforces the reactive streams protocol. For instance if you have several threads competing to send items, then items will still be emitted serially by one of the threads rather than concurrently.

Helpers

Mutiny Zero offers a few helpers for commonly-needed tasks, but without the intention of becoming a full-fledge reactive programming API.

Introducing new helpers will always be done carefully by observing real world implementation patterns.

CompletionStage helpers

The AsyncHelpers class offers a few helpers to simplify developing against the CompletionStage, especially before Java 11.

  • failedFuture creates a CompletionStage that has already failed.
  • applyExceptionally applies a function to map a failure Throwable to another Throwable.
  • composeExceptionally applies a function to compose a failure Throwable to another CompletionStage.

Publisher helpers

More often than not we need a little help when dealing with a Publisher.

  • collectToList assembles all items from a Publisher to a CompletionStage<List>.
  • map returns a Publisher that applies a function to all items from the original Publisher.