public interface KafkaConsumerRebalanceListener
@Identifier
and configured against an inbound connector, it will be applied as a consumer rebalance listener
to that inbound connector's consumer.
To configure which listener you want to use, set the name in the inbound connector's consumer rebalance listener attribute. For example:
mp.messaging.incoming.example.consumer-rebalance-listener.name=ExampleConsumerRebalanceListener @Identifier("ExampleConsumerRebalanceListener") public class ExampleConsumerRebalanceListener implements KafkaConsumerRebalanceListener { ... }Alternatively, name your listener (using the
@Identifier
annotation) to be the group id used by the connector.
For example:
mp.messaging.incoming.example.group.id=my-group @Identifier("my-group") public class MyGroupConsumerRebalanceListener implements KafkaConsumerRebalanceListener { ... }
Setting the consumer rebalance listener name takes precedence over using the group id.
ConsumerRebalanceListener
Modifier and Type | Method and Description |
---|---|
default void |
onPartitionsAssigned(org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions)
A callback method the user can implement to provide handling of customized offsets on completion of a successful
partition re-assignment.
|
default void |
onPartitionsLost(org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions)
A callback method you can implement to provide handling of cleaning up resources for partitions that have already
been reassigned to other consumers.
|
default void |
onPartitionsRevoked(org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions)
A callback method the user can implement to provide handling of offset commits to a customized store.
|
default void onPartitionsAssigned(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions)
poll(long)
call.
It is guaranteed that under normal conditions all the processes in a consumer group will execute their
onPartitionsRevoked(Consumer, Collection)
callback before any instance executes this
callback. During exceptional scenarios, partitions may be migrated
without the old owner being notified (i.e. their onPartitionsRevoked(Consumer, Collection)
callback not
triggered),
and later when the old owner consumer realized this event, the onPartitionsLost(Consumer, Collection)
(Collection)} callback
will be triggered by the consumer then.
It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
for a WakeupException
or InterruptException
to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
invocation of KafkaConsumer.poll(java.time.Duration)
in which this callback is
being executed. This means it is not
necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
IMPORTANT: The behavior must be blocking. Callback invoked from the polling thread.
partitions
- The list of partitions that are now assigned to the consumer (previously owned partitions will
NOT be included, i.e. this list will only include newly added partitions)org.apache.kafka.common.errors.WakeupException
- If raised from a nested call to
KafkaConsumer
org.apache.kafka.common.errors.InterruptException
- If raised from a nested call to
KafkaConsumer
default void onPartitionsRevoked(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions)
KafkaConsumer.close(Duration)
)
or is unsubscribing (KafkaConsumer.unsubscribe()
).
It is recommended that offsets should be committed in this callback to either Kafka or a
custom offset store to prevent duplicate data.
In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.
In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the
set is non-empty.
For examples on usage of this API, see Usage Examples section of KafkaConsumer
.
It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
for a WakeupException
or InterruptException
to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
invocation of KafkaConsumer.poll(java.time.Duration)
in which this callback is
being executed. This means it is not
necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
IMPORTANT: The behavior must be blocking. Callback invoked from the polling thread.
partitions
- The list of partitions that were assigned to the consumer and now need to be revoked (may not
include all currently assigned partitions, i.e. there may still be some partitions left)org.apache.kafka.common.errors.WakeupException
- If raised from a nested call to
KafkaConsumer
org.apache.kafka.common.errors.InterruptException
- If raised from a nested call to
KafkaConsumer
default void onPartitionsLost(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions)
onPartitionsRevoked(org.apache.kafka.clients.consumer.Consumer<?, ?>, java.util.Collection<org.apache.kafka.common.TopicPartition>)
, before being reassigned
to other consumers during a rebalance event. However, during exceptional scenarios when the consumer realized that it
does not own this partition any longer, i.e. not revoked via a normal rebalance event, then this method would be invoked.
For example, this function is called if a consumer's session timeout has expired, or if a fatal error has been received indicating the consumer is no longer part of the group.
By default it will just trigger onPartitionsRevoked(org.apache.kafka.clients.consumer.Consumer<?, ?>, java.util.Collection<org.apache.kafka.common.TopicPartition>)
; for users who want to distinguish
the handling logic of revoked partitions v.s. lost partitions, they can override the default implementation.
It is possible
for a WakeupException
or InterruptException
to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
invocation of KafkaConsumer.poll(java.time.Duration)
in which this callback is
being executed. This means it is not
necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
partitions
- The list of partitions that were assigned to the consumer and now have been reassigned
to other consumers. With the current protocol this will always include all of the consumer's
previously assigned partitions, but this may change in future protocols (ie there would still
be some partitions left)org.apache.kafka.common.errors.WakeupException
- If raised from a nested call to
KafkaConsumer
org.apache.kafka.common.errors.InterruptException
- If raised from a nested call to
KafkaConsumer
Copyright © 2018–2021 SmallRye. All rights reserved.