Skip to content

Consumer Rebalance Listener

To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener. To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface, make the implementing class a bean, and add the @Identifier qualifier. A usual use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.

The listener is invoked every time the consumer topic/partition assignment changes. For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer. If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.

Note that the rebalance listener methods are called from the Kafka polling thread and must block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.

When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and restarts once the rebalance completes.

If the rebalance listener handles offset commit on behalf of the user (using the ignore commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback. We also recommend applying the same logic when the application stops.

Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.

Example

In this example we set-up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide a bean that implements the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and is annotated with @Identifier. We then must configure our inbound connector to use this named bean.

package kafka.inbound;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer,
            Collection<org.apache.kafka.common.TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<org.apache.kafka.common.TopicPartition, Long> request = new HashMap<>();
        for (org.apache.kafka.common.TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> offsets = consumer
                .offsetsForTimes(request);
        for (Map.Entry<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package kafka.inbound;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
        // We don't need to ACK messages because in this example we set offset during consumer re-balance
        return CompletableFuture.completedFuture(null);
    }

}

To configure the inbound connector to use the provided listener we either set the consumer rebalance listener’s name:

  • mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

Or have the listener’s name be the same as the group id:

  • mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

Setting the consumer rebalance listener’s name takes precedence over using the group id.