Acknowledgement

Acknowledgment is an essential concept in messaging. A message is acknowledged when its processing or reception has been successful. It allows the broker to move to the next message.

How acknowledgment is used, and the exact behavior in terms of retry and resilience depends on the broker. For example, for Kafka, it would commit the offset. For AMQP, it would inform the broker that the message has been accepted.

Reactive Messaging supports acknowledgement. The default acknowledgment depends on the method signature. Also, the acknowledgment policy can be configured using the @Acknowledgement annotation.

Chain of acknowledgment

If we reuse this example:

@Outgoing("source")
public Multi<String> generate() {
    return Multi.createFrom().items("Hello", "from", "reactive", "messaging");
}

@Incoming("source")
@Outgoing("sink")
public String process(String in) {
    return in.toUpperCase();
}

@Incoming("sink")
public void consume(String processed) {
    System.out.println(processed);
}

The framework automatically acknowledges the message received from the sink channel when the consume method returns. As a consequence, the message received by the process method is acknowledged, and so on. In other words, it creates a chain of acknowledgement - from the outbound channel to the inbound channel.

When using connectors to receive and consume messages, the outbound connector acknowledges the messages when they are dispatched successfully to the broker. The acknowledgment chain would, as a result, acknowledges the inbound connector, which would be able to send an acknowledgment to the broker.

This chain of acknowledgment is automatically implemented when processing payloads.

Acknowledgment when using Messages

When using Messages, the user controls the acknowledgment, and so the chain is not formed automatically. It gives you more flexibility about when and how the incoming messages are acknowledged.

If you create a Message using the with method, is copy the acknowledgment function from the incoming message:

@Incoming("in")
@Outgoing("out")
public Message<Integer> process(Message<Integer> in) {
    // The acknowledgement is forwarded, when the consumer
    // acknowledges the message, `in` will be acknowledged
    return in.withPayload(in.getPayload() + 1);
}

To have more control over the acknowledgment, you can create a brand new Message and pass the acknowledgment function:

Message<String> message = Message.of("hello", () -> {
    // called when the consumer acknowledges the message

    // return a CompletionStage completed when the
    // acknowledgment of the created message is
    // completed.
    // For immediate ack use:
    return CompletableFuture.completedFuture(null);

});

However, you may need to create the acknowledgment chain, to acknowledge the incoming message:

@Incoming("in")
@Outgoing("out")
public Message<Integer> processAndProduceNewMessage(Message<Integer> in) {
    // The acknowledgement is forwarded, when the consumer
    // acknowledges the message, `in` will be acknowledged
    return Message.of(in.getPayload() + 1,
        () -> {
            // Called when the consumer acknowledges the message
            // ...
            // Don't forget to acknowledge the incoming message:
            return in.ack();
        });
}

To trigger the acknowledgment of the incoming message, use the ack() method. It returns a CompletionStage, receiving null as value when the acknowledgment has completed.

Acknowledgment when using streams

When transforming streams of Message, the acknowledgment is delegated to the user. It means that it’s up to the user to acknowledged the incoming messages:

@Incoming("in")
@Outgoing("out")
public Publisher<Message<String>> transform(Multi<Message<String>> stream) {
    return stream
        .map(message ->
            message.withPayload(message.getPayload().toUpperCase())
        );
}

In the previous example, we only generate a single message per incoming message so that we can use the with method. It becomes more sophisticated when grouping incoming messages or when each incoming message produces multiple messages.

In the case of a stream of payloads, the default strategy acknowledges the incoming messages before being processed by the method (regardless of the outcome).

@Incoming("in")
@Outgoing("out")
public Publisher<String> transformPayload(Multi<String> stream) {
    return stream
        // The incoming messages are already acknowledged
        .map(String::toUpperCase);
}

Controlling acknowledgement

The Acknowledgment annotation lets you customize the default strategy presented in the previous sections. The @Acknowledgement annotation takes a strategy as parameter. Reactive Messaging proposed 4 strategies:

  • POST_PROCESSING - the acknowledgement of the incoming message is executed once the produced message is acknowledged.

  • PRE_PROCESSING - the acknowledgement of the incoming message is executed before the message is processed by the method.

  • MANUAL - the acknowledgement is doe by the user.

  • NONE - No acknowledgment is performed, neither manually or automatically.

It is recommended to use POST_PROCESSING as it guarantees that the full processing has completed before acknowledging the incoming message. However, sometimes it’s not possible, and this strategy is not available if you manipulate streams of Messages or payloads.

The PRE_PROCESSING strategy can be useful to acknowledge a message early in the process:

@Incoming("in")
@Outgoing("out")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public String process(String input) {
    // The message wrapping the payload is already acknowledged
    // The default would have waited the produced message to be
    // acknowledged
    return input.toUpperCase();
}

It cuts the acknowledgment chain, meaning that the rest of the processing is not linked to the incoming message anymore. This strategy is the default strategy when manipulating streams of payloads.

Refer to the signature list to determine which strategies are available for a specific method signature and what’s the default strategy.

Negative acknowledgement

Messages can also be nacked, which indicates that the message was not processed correctly. The Message.nack method indicates failing processing (and supply the reason), and, as for successful acknowledgment, the nack is propagated through the chain of messages.

If the message has been produced by a connector, this connector implements specific behavior when receiving a nack. It can fail (default), or ignore the failing, or implement a dead-letter queue mechanism. Refer to the connector documentation for further details about the available strategies.

If the message is sent by an emitter using the send(P) method, the returned CompletionStage is completed exceptionally with the nack reason.

@Inject @Channel("data") Emitter<String> emitter;

public void emitPayload() {
    CompletionStage<Void> completionStage = emitter.send("hello");
    completionStage.whenComplete((acked, nacked) -> {
        if (nacked != null) {
            // the processing has failed
        }
    });
}

Negative acknowledgment can be manual or automatic. If your method handles instances of Message and the acknowledgment strategy is MANUAL, you can nack a message explicitly. You must indicate the reason (an exception) when calling the nack method. As for successful acknowledgment, the nack returns a CompletionStage completed when the nack has been processed.

If your method uses the POST_PROCESSING acknowledgment strategy, and the method fails (either by throwing an exception or by producing a failure), the message is automatically nacked with the caught exception:

@Incoming("data")
@Outgoing("out")
public String process(String s) {
    if (s.equalsIgnoreCase("b")) {
        // Throwing an exception triggers a nack
        throw new IllegalArgumentException("b");
    }

    if (s.equalsIgnoreCase("e")) {
        // Returning null would skip the message (it will be acked)
        return null;
    }

    return s.toUpperCase();
}

@Incoming("data")
@Outgoing("out")
public Uni<String> processAsync(String s) {
    if (s.equalsIgnoreCase("a")) {
        // Returning a failing Uni triggers a nack
        return Uni.createFrom().failure(new Exception("a"));
    }

    if (s.equalsIgnoreCase("b")) {
        // Throwing an exception triggers a nack
        throw new IllegalArgumentException("b");
    }

    if (s.equalsIgnoreCase("e")) {
        // Returning null would skip the message (it will be acked not nacked)
        return Uni.createFrom().nullItem();
    }

    if (s.equalsIgnoreCase("f")) {
        // returning `null` is invalid for method returning Unis, the message is nacked
        return null;
    }

    return Uni.createFrom().item(s.toUpperCase());
}