The previous examples illustrated how to transform each item from a stream into another item. Yet, there are cases where we need to go beyond this, for example:
Transforming an item into a
Uni- any asynchronous processing such as calling a remote service, interacting with a database, etc
Transforming an item into a
Multi- producing a multi-items stream based on the incoming item, filtering out items, etc
Having the possibility to transform an item into a stream gives us many opportunities.
To implement such transformations, we use
onItem().transformToUni(Function<T, Uni<O>> and
Imagine you have a
Uni<String>, and you want to call a remote service.
Calling a remote service is an asynchronous action represented by a
Uni, as in:
Uni<String> invokeRemoteGreetingService(String name);
To call this service, you need to transform the item received from the first
Uni into the
Uni returned by the service:
Uni<String> result = uni .onItem().transformToUni(name -> invokeRemoteGreetingService(name));
This snippet chains the first
Uni with another one.
result) emits the result from the remote service or a failure if anything wrong happened:
Uni<String> uni = Uni.createFrom().item("Cameron"); uni .onItem().transformToUni(name -> invokeRemoteGreetingService(name)) .subscribe().with( item -> System.out.println(item), // Print "Hello Cameron", fail -> fail.printStackTrace()); // Print the failure stack trace
The previous example produced a single item.
You may want to transform the received item into a stream which is… a
Multi<String> result = uni .onItem().transformToMulti(item -> Multi.createFrom().items(item, item));
This code creates a stream of two elements, duplicating the received item.
uni .onItem().transformToMulti(item -> Multi.createFrom().items(item, item)) .subscribe().with( item -> System.out.println(item)); // Called twice
Multi objects can of course be more complicated than that and emit items in an asynchronous fashion.
When transforming items emitted by an upstream
Multi, we need to answer the following question: how are we going to merge the produced items back?
Let’s take an example.
Multi emitting the
Donna items (in order), and you want to call the
invokeRemoteGreetingService from above.
It thus calls
The service does not have a constant response time (because of network latency or the load), which means that responses can be interleaved.
Indeed, you may receive
"Hello Donna" before
Now, how do you want to handle this case?
Do you need to preserve the order and ensure that the downstream subscriber will always get
"Hello Cameron" first, or do you accept interleaved responses?
When transforming items from
Multi into streams, you need to decide in which order the items emitted by the produced stream are going to be received by the downstream subscriber.
Mutiny offers two possibilities:
merge - it does not preserve the order and emits the items from the produced streams as they come,
concatenate - it maintains and concatenates the streams produced for each item.
To implement the scenario from the last section, you will use
onItem().transformToUniAndConcatenate() depending on your ordering choice:
Multi<String> merged = multi .onItem().transformToUniAndMerge(name -> invokeRemoteGreetingService(name)); Multi<String> concat = multi .onItem().transformToUniAndConcatenate(name -> invokeRemoteGreetingService(name));
onItem().transformToMultiAndConcatenate transform incoming items into
Multi objects are either merged or concatenated:
Multi<String> merged = multi .onItem().transformToMultiAndMerge(item -> someMulti(item)); Multi<String> concat = multi .onItem().transformToMultiAndConcatenate(item -> someMulti(item));