SmallRye Reactive Stream Operators is an implementation of the Eclipse MicroProfile Reactive Stream Operators specification (version 1.0).
IMPORTANT: Project in maintenance mode
This repository is in maintenance mode. No new features will be implemented.
Another implementation of MicroProfile Reactive Streams Operators is available in Mutiny. It is strongly recommended to switch to this implementation.
Reactive Converters have been migrated to https://github.com/smallrye/smallrye-reactive-utils.
If you have any questions, send a message to https://groups.google.com/forum/#!forum/smallrye.
The MicroProfile Reactive Stream Operators specification define a set of operators for Reactive Streams. You can:
-
Create Reactive Streams
-
Process the data transiting in the streams
-
Accumulate results
The idea behind the specification is to provide the equivalent of java.util.stream
however, for Reactive Stream, so, inherently asynchronous, supporting back-pressure and with error and completion signals propagation.
The following code snippet shows how close the API is:
// java.util.stream version:
Stream.of("hello", "world")
.filter(word -> word.length() <= 5)
.map(String::toUpperCase)
.findFirst()
.ifPresent(s -> System.out.println("Regular Java stream result: " + s));
// reactive stream operator version:
ReactiveStreams.of("hello", "world")
.filter(word -> word.length() <= 5)
.map(String::toUpperCase)
.findFirst()
.run() // Run the stream (start publishing)
// Retrieve the result asynchronously, using a CompletionStage
.thenAccept(res -> res
.ifPresent(s -> System.out.println("Reactive Stream result: " + s)));
The SmallRye implementation is based on RX Java 2.
The code is released under the Apache Software License 2.0 and is available on Github.
1. Getting Started
1.1. Quickstart
The easiest to start using SmallRye Reactive Stream Operators is to start it directly in a main class. You only need to put smallrye-reactive-streams-operators
in your CLASSPATH
to use it.
Creates a Maven project, and include the following dependency in your pom.xml:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-streams-operators</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
Once created, create a class file with a public static void main(String… args)
method:
package io.smallrye.reactive.operators.quickstart;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
public class QuickStart {
public static void main(String[] args) {
// Create a stream of words
ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators")
.map(String::toUpperCase) // Transform the words
.filter(s -> s.length() > 4) // Filter items
.forEach(word -> System.out.println(">> " + word)) // Terminal operation
.run(); // Run it (create the streams, subscribe to it...)
}
}
Once everything is set up, you should be able to run the application using:
mvn compile exec:java -Dexec.mainClass=io.smallrye.reactive.operators.quickstart.QuickStart
Running the previous example should give the following output:
>> HELLO
>> SMALLRYE
>> REACTIVE
>> STREAM
>> OPERATORS
The Reactive Streams Operator is intended to be used in other software and not as a standalone api. However, to give you a better overview the 2 following quickstart explains how to use it in Eclipse Vert.x applications and Apache Camel applications.
1.2. Using Reactive Streams Operators in a Vert.x application
Eclipse Vert.x is a toolkit to create reactive and distributed systems. In addition to the bare Vert.x API, you can also use Vert.x using RX Java 2. As a consequence, you can wrap Vert.x streams and use Reactive Stream Operators to manipulate them:
package io.smallrye.reactive.operators.quickstart;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
public class DataProcessor extends AbstractVerticle {
private static final int PORT = 8080;
@Override
public void start(Future<Void> done) {
vertx.createHttpServer()
.requestHandler(request -> {
// Consume messages from the Vert.x event bus
MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("data");
// Wrap the stream and manipulate the data
ReactiveStreams.fromPublisher(consumer.toFlowable())
.limit(5) // Take only 5 messages
.map(Message::body) // Extract the body
.map(json -> json.getInteger("value")) // Extract the value
.peek(i -> System.out.println("Got value: " + i)) // Print it
.reduce(0, (acc, value) -> acc + value)
.run() // Begin to receive items
.whenComplete((res, err) -> {
// When the 5 items has been consumed, write the result to the
// HTTP response:
if (err != null) {
request.response().setStatusCode(500).end(err.getMessage());
} else {
request.response().end("Result is: " + res);
}
});
})
.listen(PORT, ar -> done.handle(ar.mapEmpty()));
}
}
This example creates an HTTP server and for each request to collect 5 messages sent by another component on the Vert.x event bus. It computes the sum of these 5 elements and writes the result to the HTTP response. It’s important to notice that the messages coming from the event bus are sent asynchronously. So, it would not be possible to write the previous code using java.util.streams.
When used in a Vert.x application, Reactive Stream Operators can be used to processed data and compute an asynchronous result.
1.3. Using Reactive Streams Operators in a Camel application
Apache Camel is a toolkit to define routing and mediation rules, mainly used to integrate systems, using enterprise integration patterns. Apache Camel provides more than 200+ components so that it can integrate virtually with anything.
You can combine Reactive Stream Operators and Apache Camel thanks to the Camel Reactive Stream Component.
package io.smallrye.reactive.operators.quickstart;
import org.apache.camel.CamelContext;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.impl.DefaultCamelContext;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Subscriber;
import java.io.File;
import java.nio.file.Files;
public class QuickStart {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> subscriber = camel
.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
"using", "Apache", "Camel")
.map(String::toUpperCase) // Transform the words
.filter(s -> s.length() > 4) // Filter items
.peek(System.out::println)
.map(s -> s + " ")
.to(subscriber)
.run();
context.start();
// Just wait until it's done.
Thread.sleep(1000);
Files.readAllLines(new File("target/values.txt").toPath())
.forEach(s -> System.out.println("File >> " + s));
}
}
You can also use Camel to create Reactive Streams Publisher
and transform the items using Reactive Streams Operators.
2. Operators
As mentioned before, the Reactive Streams API is an asynchronous version of java.util.stream
for Reactive Streams. This section list the operators that are provided.
The Reactive Streams Operators introduce a set of types to allow creating Reactive Streams:
Reactive Streams | Reactive Stream Operators | Termination |
---|---|---|
|
|
|
|
|
|
|
|
|
Besides, the Reactive Streams Operators introduce CompletionRunner
that triggers the emission of the items and provides a way to retrieve the asynchronously computed result.
2.1. Creating streams
The first part of the API allows to create PublisherBuilder
. A Reactive Streams Publisher
can be created from the builder using the .buildRS
method.
2.1.1. Creating empty streams
-
Operator:
empty
-
Description: Creates an empty stream.
-
Example:
PublisherBuilder<T> empty = ReactiveStreams.empty();
2.1.2. Creating streams from elements
-
Operator:
of
,ofNullable
-
Description: Creates a stream of 0, 1 or n elements.
-
Example:
PublisherBuilder<T> streamOfOne = ReactiveStreams.of(t1); PublisherBuilder<T> streamOfThree = ReactiveStreams.of(t1, t2, t3); PublisherBuilder<T> streamOfOneOrEmpty = ReactiveStreams.ofNullable(maybeNull);
2.1.3. Creating failing streams
-
Operator:
failed
-
Description: Creates a failed stream.
-
Example:
PublisherBuilder<T> failed = ReactiveStreams.failed(new Exception("BOOM!"));
2.1.4. Creating streams from CompletionStage
-
Operator:
fromCompletionStage
,fromCompletionStageNullable
-
Description: Creates a stream of 0 or 1 element emitted when the passed
CompletionStage
is completed.
-
Example:
PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStage(cs); // If the redeemed value is `null`, an error is propagated in the stream. PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStageNullable(cs); // If the redeemed value is `null`, the stream is completed immediately.
2.1.5. Creating streams from collections
-
Operator:
fromIterable
-
Description: Creates a stream emitting the elements from the passed iterable and then send the completion signal.
-
Example:
PublisherBuilder<T> stream = ReactiveStreams.fromIterable(iterable); // If the iterable does not contain elements, the resulting stream is empty.
2.1.6. Wrapping a Reactive Stream Publisher
-
Operator:
fromPublisher
-
Description: Creates a stream emitting the elements from the passed
Publisher
. -
Example:
PublisherBuilder<T> stream = ReactiveStreams.fromPublisher(publisher); // If the publisher does not emit elements, the resulting stream is empty.
2.1.7. Generating infinite streams
-
Operator:
generate
,iterate
-
Description: Creates a stream using the generator method. The number of generated elements depends on the request.
-
Example:
AtomicInteger counter = new AtomicInteger(); PublisherBuilder<Integer> stream = ReactiveStreams .generate(() -> counter.getAndIncrement()); // The resulting stream is an infinite stream. PublisherBuilder<Integer> stream = ReactiveStreams .iterate(0, last -> last + 1); // The resulting stream is an infinite stream.
2.2. Processing streams
These operators transform the items transiting on the streams.
2.2.1. Creating a processor
A processor is a Reactive Streams component that is both a Publisher
and a Subscriber
. It consumes and emits elements.
-
Example:
ProcessorBuilder<I, O> builder = ReactiveStreams.<I>builder() .map(i -> (O) i); // Emit element of type O
2.2.2. Filtering elements
-
Operator:
filter
,distinct
,dropWhile
,skip
,limit
,takeWhile
-
Description: These operators filter items transiting on the stream:
-
filter - select the element using a predicate
-
distinct - remove similar element (Attention: do not use on large or unbounded streams)
-
dropWhile - drop elements until the predicate returns
true
-
skip - ignore x elements
-
takeWhile - forward elements until the predicate returns
true
-
limit - pick x elements
-
-
Example:
ReactiveStreams.of(1, 2, 3) .filter(i -> i > 2); // (1, 2) ReactiveStreams.of(2, 2, 3, 3, 2, 1) .distinct(); // (2, 3, 1) ReactiveStreams.of(2, 2, 3, 3, 2, 1) .dropWhile(i -> i == 2); // (3, 3, 2, 1) ReactiveStreams.of(2, 2, 3, 3, 2, 1) .skip(3); // (3, 2, 1) ReactiveStreams.of(2, 2, 3, 3, 2, 1) .limit(3); // (2, 2, 3) ReactiveStreams.of(2, 2, 3, 3, 2, 1) .takeWhile(i -> i == 2); // (2, 2)
2.2.3. Composing asynchronous actions
-
Operator:
flatMap
,flatMapCompletionStage
,flatMapIterable
,flatMapRsPublisher
-
Description: Produces a stream for each element of the stream. The return stream is flatten (serialized) in the returned stream
-
flatMap - Returns a
PublisherBuilder
and serialize the elements in the returned stream. -
flatMapCompletionStage - Produces a
CompletionStage
. When completed, the result is passed to the returned stream. -
flatMapIterable - Produces an
Iterable
and flatten the element into the returned stream. ThisflatMap
method is not asynchronous. -
flatMapRSPublisher - Like
flatMap
but return a Reactive StreamsPublisher
-
-
Example:
ReactiveStreams.of(1, 2) .flatMap(i -> ReactiveStreams.of(i, i)); // (1, 1, 2, 2) ReactiveStreams.of(1, 2) .flatMapIterable(i -> Arrays.asList(i, i)); // (1, 1, 2, 2) ReactiveStreams.of(1, 2) .flatMap(i -> ReactiveStreams.of(i, i)); // (1, 1, 2, 2) ReactiveStreams.of(1, 2) .flatMapCompletionStage(i -> invokeAsyncService(i));
The produced value can be emitted asynchronously, except for flatMapIterable .
|
The CompletionStage returned by flatMapCompletionStage must not redeem null , as null is an invalid value for Reactive Streams. So, you cannot use CompletionStage<Void> .
|
2.2.4. Transforming items
-
Operator:
map
-
Description: Produces a value synchronously
-
Example:
ReactiveStreams.of(1, 2, 3) .map(i -> i + 1); // (2, 3, 4)
2.2.5. Combining a Processor
-
Operator:
via
-
Description: Forward the items to a
Processor
orProcessorBuilder
-
Example:
ProcessorBuilder<Integer, String> processor = ReactiveStreams .<Integer>builder().map(i -> Integer.toString(i)); ReactiveStreams.of(1, 2) .via(processor); // ("1", "2")
2.3. Action operators
These operators give you the ability to react to the different events happening in the streams.
-
Operator:
peek
,onComplete
,onTerminate
,onError
-
Description: These operators let you react to various events such as when an element is received, an error is propagated or when the stream completes.
-
peek - called for each element
-
onComplete - called when the stream completes
-
onError - called when an error is propagated in the stream
-
onTerminate - called either when an error is propagated or when the stream completes
-
2.4. Error management operators
These operators allow recovering after a failure. Because you handle asynchronous streams of data, you can’t use
try/catch
, so these operators provide a similar feature.
-
Operator:
onErroResume
,onErrorResumeWith
,onErrorResumeWithRsPublisher
-
Description: These operators let you react to various events such as when a element is received, an error is propagated or when the stream completes.
2.4.1. Terminal operator and computing asynchronous result
These operators act as subscribers and produce a result. As the result is computed asynchronously, you retrieve a
CompletionStage
object.
2.4.2. Cancelling a stream
-
Operator:
cancel
-
Description: Cancel the subscription to a stream. No more items will be received.
2.4.3. Ignoring elements
-
Operator:
ignore
-
Description: ignore the elements transiting on the streams. The elements are still emitted but ignored.
-
Example:
ReactiveStreams.of(1, 2, 3) .peek(i -> System.out.println("Receiving: " + i)) .ignore() .run() .thenAccept(x -> System.out.println("Done!"));
2.4.4. Collecting results
-
Operator:
collect
,reduce
,toList
-
Description: These operators allows accumulating items or intermediary results to compute a final value.
-
Example:
ReactiveStreams.of(1, 2, 3) .collect(Collectors.summingInt(i -> i)) .run() // Produce 6 .thenAccept(res -> System.out.println("Result is: " + res)); ReactiveStreams.of(1, 2, 3) .collect(() -> new AtomicInteger(1), AtomicInteger::addAndGet) .run() // Produce 7 .thenAccept(res -> System.out.println("Result is: " + res)); ReactiveStreams.of(1, 2, 3) .reduce((acc, item) -> acc + item) .run() // Produce Optional(6) .thenAccept(res -> res.ifPresent(sum -> System.out.println("Result is: " + sum))); ReactiveStreams.of(1, 2, 3) .toList() .run() // Produce [1, 2, 3] .thenAccept(res -> System.out.println("Result is: " + res));
2.4.5. Get the first item of a stream
-
Operator:
findFirst
-
Description: Retrieves the first item of a stream if any
-
Example:
ReactiveStreams.of(1, 2, 3) .findFirst() .run() // Produce Optional[1] .thenAccept(maybe -> System.out.println(maybe));
2.4.6. Execute a method for each element
-
Operator:
forEach
-
Description: Execute a method for each element of a stream. Unlike
peek
, this is a terminal operation.
-
Example:
ReactiveStreams.of(1, 2, 3) .forEach( i -> System.out.println("Receiving " + i) ) .run();
2.4.7. Pass to a Reactive Streams Subscriber
-
Operator:
to
-
Description: Forward the elements of a stream to a given
Subscriber
orSubscriberBuilder
. -
Example:
SubscriberBuilder<Integer, Optional<Integer>> subscriber = ReactiveStreams.<Integer>builder() .map(i -> i + 1) .findFirst(); ReactiveStreams.of(1, 2, 3) .to(subscriber) .run() // Produce Optional[2] .thenAccept(optional -> optional.ifPresent(i -> System.out.println("Result: " + i)));
3. Execution Model
SmallRye Reactive Stream Operators provides a way to control on which thread are the different callbacks invoked. By default it uses the caller thread.
If you are building a Vert.x application, add the following dependency to your project so enforce the Vert.x execution model:
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-streams-vertx-execution-model</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
With this dependency, if you are calling ReactiveStreams.x
from a Vert.x thread, the same thread is used to call the different callbacks and pass the result.
4. Reactive Type Converters
The reactive type converters are a set of modules not directly related to MicroProfile Reactive Streams Operators. These converters adapts reactive types from different reactive programming libraries. The main interface is:
public interface ReactiveTypeConverter<T> {
<X> CompletionStage<X> toCompletionStage(T instance);
<X> Publisher<X> toRSPublisher(T instance);
<X> T fromCompletionStage(CompletionStage<X> cs);
<X> T fromPublisher(Publisher<X> publisher);
// ...
You can use converters to convert types provided by different reactive programming libraries to Publisher
and
CompletionStage
, and the opposite:
To use the converter you need:
-
Add the converter api dependency:
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-api</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
-
For each reactive programming, add the associated dependency:
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-reactive-streams-operators</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-reactor</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-rxjava1</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-rxjava2</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
-
In your code, lookup for a converter and apply the conversion. For instance:
CompletionStage cs = ...
ReactiveTypeConverter<Completable> converter = Registry.lookup(Completable.class)
.orElseThrow(() -> new AssertionError("Completable converter should be found"));
Completable converted = converter.fromCompletionStage(cs);
The conversion rules are detailed in the javadoc.