Skip to content

Multiple Outgoing Channels

Experimental

Multiple @Outgoings is an experimental feature.

The @Outgoing annotation is repeatable. It means that the method dispatches outgoing messages to multiple listed channels:

1
2
3
4
5
6
7
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
    // send messages from channel-in to both channel-out1 and channel-out2
    return s.toUpperCase();
}

The default behaviour is same as the @Broadcast annotation, meaning that outbound messages are dispatched to all listed outgoing channels.

However, different dispatching mechanism can be employed:

Selectively dispatching messages using Targeted messages

You can selectively dispatch messages to multiple outgoings by returning Targeted :

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
    // send messages from channel-in to both channel-out1 and channel-out2
    Targeted targeted = Targeted.of("out1", "Price: " + price,
            "out2", "Quote: " + price);
    if (price > 90.0) {
        return targeted.with("out3", price);
    }
    return targeted;
}

In this example, three outgoing channels are declared on the process method but in some condition channel out3 does not receive any messages.

Coordinated acknowledgements

Targeted return types coordinate acknowledgements between outgoing messages and the incoming message, therefore the incoming message will be ack'ed only when all outgoing messages are ack'ed.

In cases where you need to consume Message and handle metadata propagation more finely you can use TargetedMessages which is a Message type:

@Incoming("channel-in")
@Outgoing("channel-out1")
@Outgoing("channel-out2")
@Outgoing("channel-out3")
public TargetedMessages processMessage(Message<String> msg) {
    // send messages from channel-in to both channel-out1 and channel-out2
    return Messages.chain(msg)
            .with(Map.of("channel-out1", msg.withPayload(msg.getPayload().toUpperCase()),
                    "channel-out2", msg.withPayload(msg.getPayload().toLowerCase())));
}

Note that in this case coordinated acknowledgements is handled explicitly using Messages utility.

Branching outgoing channels with MultiSplitter

In stream transformer processors it can be useful to branch out an incoming stream into different sub-streams, based on some conditions.

When consuming a Multi, you can use the Multi.split (see Mutiny documentation) operation to define multiple branches. The stream transformer method with multiple outgoings must return a MultiSplitter.

enum Caps {
    ALL_CAPS,
    ALL_LOW,
    MIXED
}

@Incoming("in")
@Outgoing("sink1")
@Outgoing("sink2")
@Outgoing("sink3")
public MultiSplitter<String, Caps> reshape(Multi<String> in) {
    return in.split(Caps.class, s -> {
        if (Objects.equals(s, s.toLowerCase())) {
            return Caps.ALL_LOW;
        } else if (Objects.equals(s, s.toUpperCase())) {
            return Caps.ALL_CAPS;
        } else {
            return Caps.MIXED;
        }
    });
}

In this case the number of outgoing channels must match the number of branches given to split operation. Outgoing channels will be tried to be matched to branch identifier enum toString ignoring case. If not all branches are matched, it will fall back to one-by-one matching depending on the order of outgoing channel declarations and enum ordinals.