Skip to content

Using KeyedMulti

Experimental

KeyedMulti is an experimental feature.

When implementing a data streaming application, it's common to handle messages partitioned using a key. In this case, your stream manipulation often has to 1) group by key and 2) do the manipulation. Reactive Messaging can do the first step for you and reduce the code complexity. To do this, it injects io.smallrye.reactive.messaging.keyed.KeyedMulti in your method instead of a bare Multi.

For example, imagine the following stream, represented as key:value: "a:1", "b:1", "a:2", "b:2", "b:3"... Then, let's consider the following method:

@Incoming("in")
@Outgoing("out")
public Multi<String> reshape(KeyedMulti<String, String> multi) {
    // Called once per key and receive the stream of value for that specific key
    String key = multi.key();
    return multi.onItem().scan(AtomicInteger::new, (i, s) -> {
        i.incrementAndGet();
        return i;
    })
            .map(i -> Integer.toString(i.get()));
}

Reactive Messaging automatically extracts the key and value from the incoming stream and invokes the method for each key. The received KeyedMulti represent the stream for each key. The key() method returns the extracted key.

The key and value can be extracted from the payload but also (and often) from the message's metadata.

When using Kafka, it automatically extracts the key/value from the Kafka records. In the other cases, or if you need custom extraction, you can implement your own io.smallrye.reactive.messaging.keyed.KeyValueExtractor. Implementations are exposed ApplicationScoped beans, and are used to extract the key and value. The following implementation extracts the key and value from payloads structured as "key:value":

import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;

@ApplicationScoped
public class KeyValueExtractorFromPayload implements KeyValueExtractor {

    @Override
    public boolean canExtract(Message<?> msg, Type keyType, Type valueType) {
        // Called for the first message of the stream to select the extractor.
        // Here we only check for the type, but the logic can be more complex
        return keyType.equals(String.class) && valueType.equals(String.class);
    }

    @Override
    public String extractKey(Message<?> message, Type keyType) {
        String string = message.getPayload().toString();
        return string.substring(0, string.indexOf(":"));
    }

    @Override
    public String extractValue(Message<?> message, Type valueType) {
        String string = message.getPayload().toString();
        return string.substring(string.indexOf(":") + 1);
    }

}

The extractor selection uses the canExtract method. When multiple extractors are available, you can implement the getPriority() method to give a lower priority. Default extractors have the priority 100. So, if you have a custom extractor with the priority 99, it will be used (if it replies true to the canExtract call). In addition, you can use the io.smallrye.reactive.messaging.keyed.Keyed annotation to indicate the class of the extractor to use. The extractor must still be a CDI bean, but the canExtract method is not called, and priority does not matter:

1
2
3
4
5
6
7
8
9
@Incoming("in")
@Outgoing("out")
public Multi<String> reshape(
        @Keyed(KeyValueExtractorFromPayload.class) KeyedMulti<String, String> multi) {
    return multi.onItem().scan(AtomicInteger::new, (i, s) -> {
        i.incrementAndGet();
        return i;
    }).map(i -> Integer.toString(i.get()));
}