Transforming items asynchronously#
The previous examples illustrated how to transform each item from a stream into another item. Yet, there are cases where we need to go beyond this, for example:
- Transforming an item into a
Uni– any asynchronous processing such as calling a remote service, interacting with a database, etc
- Transforming an item into a
Multi– producing a multi-items stream based on the incoming item, filtering out items, etc
Having the possibility to transform an item into a stream gives us many opportunities.
To implement such transformations, we use
onItem().transformToUni(Function<T, Uni<O>>) and
Uni - Transforming an item into a Uni#
sequenceDiagram autonumber participant M as Uni participant O as Transformer participant U as Uni(item) participant D as Subscriber M->>O: onItem(item) O--)U: subscribe(...) U--)O: onSubscribe(sub) U--)O: onItem(result) O->>D: onItem(result)
Imagine that you have a
Uni<String>, and you want to call a remote service.
Calling a remote service is an asynchronous action represented by a
Uni, as in:
To call this service, you need to transform the item received from the first
Uni into the
Uni returned by the service:
This snippet chains the first
Uni with another one.
result) emits the result from the remote service or a failure if anything wrong happened:
Uni - Transforming an item into a Multi#
The previous example produced a single item.
You may want to transform the received item into a stream which is… a
This code creates a stream of two elements, duplicating the received item.
sequenceDiagram autonumber participant M as Uni participant O as Transformer participant U as Multi(item) participant D as Subscriber M->>O: onItem(item) O--)U: subscribe(...) U--)O: onSubscribe(sub) U--)O: onItem(item) O->>D: onItem(item) U--)O: onItem(item) O->>D: onItem(item)
Multi objects can of course be more complicated than that and emit items in an asynchronous fashion.
Transforming items from Multi - the merge vs concatenate dilemma#
When transforming items emitted by an upstream
Multi, we need to answer the following question: how are we going to merge the produced items back?
Let’s take an example.
Multi emitting the
Donna items (in order), and you want to call the
invokeRemoteGreetingService from above.
It thus calls
The service does not have a constant response time (because of network latency or the load), which means that responses can be interleaved.
Indeed, you may receive
"Hello Donna" before
Now, how do you want to handle this case?
Do you need to preserve the order and ensure that the downstream subscriber will always get
"Hello Cameron" first, or do you accept interleaved responses?
When transforming items from
Multi into streams, you need to decide in which order the items emitted by the produced stream are going to be received by the downstream subscriber.
Mutiny offers two possibilities:
- Merging – it does not preserve the order and emits the items from the produced streams as they come, or
- Concatenating – it maintains and concatenates the streams produced for each item.
Multi - Transforming an item into a Uni#
To implement the scenario from the last section, you will use
onItem().transformToUniAndConcatenate() depending on your ordering choice:
- When merging: items from the source
Multimay be processed concurrently depending on the concurrency level that has been set, if any.
- When concatenating: items from the source
Multiare processed in order, waiting for each
Unito complete before moving on to the next item.
Multi - Transforming an item into a Multi#
onItem().transformToMultiAndConcatenate transform incoming items into
Multi objects are either merged or concatenated: