Class KafkaThrottledLatestProcessedCommit
- java.lang.Object
-
- io.smallrye.reactive.messaging.kafka.commit.ContextHolder
-
- io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit
-
- All Implemented Interfaces:
KafkaCommitHandler
public class KafkaThrottledLatestProcessedCommit extends ContextHolder implements KafkaCommitHandler
Will keep track of received messages and commit to the next offset after the latest ACKed message in sequence. Will commit periodically as defined by `auto.commit.interval.ms` (default: 5000)This strategy mimics the behavior of the kafka consumer when `enable.auto.commit` is `true`.
The connector will be marked as unhealthy in the presence of any received record that has gone too long without being processed as defined by `throttled.unprocessed-record-max-age.ms` (default: 60000). If `throttled.unprocessed-record-max-age.ms` is set to less than or equal to 0 then will not perform any health check (this might lead to running out of memory).
This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing.
To use set `commit-strategy` to `throttled`.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaThrottledLatestProcessedCommit.TooManyMessagesWithoutAckException
-
Nested classes/interfaces inherited from interface io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
KafkaCommitHandler.Strategy
-
-
Field Summary
-
Fields inherited from class io.smallrye.reactive.messaging.kafka.commit.ContextHolder
context, vertx
-
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static void
clearCache()
static KafkaThrottledLatestProcessedCommit
create(io.vertx.mutiny.core.Vertx vertx, ReactiveKafkaConsumer<?,?> consumer, String groupId, KafkaConnectorIncomingConfiguration config, KafkaSource<?,?> source)
<K,V>
CompletionStage<Void>handle(IncomingKafkaRecord<K,V> record)
A message has been acknowledged.void
partitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
New partitions are assigned.void
partitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
Revoked partitions.<K,V>
io.smallrye.mutiny.Uni<IncomingKafkaRecord<K,V>>received(IncomingKafkaRecord<K,V> record)
Received a new record from Kafka.void
terminate(boolean graceful)
-
Methods inherited from class io.smallrye.reactive.messaging.kafka.commit.ContextHolder
capture, getContext, runOnContext, runOnContextAndAwait
-
-
-
-
Method Detail
-
clearCache
public static void clearCache()
-
create
public static KafkaThrottledLatestProcessedCommit create(io.vertx.mutiny.core.Vertx vertx, ReactiveKafkaConsumer<?,?> consumer, String groupId, KafkaConnectorIncomingConfiguration config, KafkaSource<?,?> source)
-
partitionsAssigned
public void partitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
New partitions are assigned. This method is called from the Kafka poll thread.- Specified by:
partitionsAssigned
in interfaceKafkaCommitHandler
- Parameters:
partitions
- the list of partitions that are now assigned to the consumer (may include partitions previously assigned to the consumer)
-
partitionsRevoked
public void partitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
Revoked partitions. This method is called from the Kafka pool thread.- Specified by:
partitionsRevoked
in interfaceKafkaCommitHandler
- Parameters:
partitions
- The list of partitions that were assigned to the consumer and now need to be revoked (may not include all currently assigned partitions).
-
received
public <K,V> io.smallrye.mutiny.Uni<IncomingKafkaRecord<K,V>> received(IncomingKafkaRecord<K,V> record)
Received a new record from Kafka. This method is called from a Vert.x event loop.- Specified by:
received
in interfaceKafkaCommitHandler
- Type Parameters:
K
- the keyV
- the value- Parameters:
record
- the record- Returns:
- the record emitted once everything has been done
-
handle
public <K,V> CompletionStage<Void> handle(IncomingKafkaRecord<K,V> record)
A message has been acknowledged. This method is NOT necessarily called on an event loop.- Specified by:
handle
in interfaceKafkaCommitHandler
- Type Parameters:
K
- the keyV
- the value- Parameters:
record
- the record- Returns:
- a completion stage indicating when the commit complete
-
terminate
public void terminate(boolean graceful)
- Specified by:
terminate
in interfaceKafkaCommitHandler
-
-