Receiving Kafka Records
The Kafka Connector retrieves Kafka Records from Kafka Brokers and maps
each of them to Reactive Messaging
Let’s imagine you have a Kafka broker running, and accessible using the
kafka:9092 address (by default it would use
Configure your application to receive Kafka records from a Kafka topic
prices channel as follows:
Configure the broker location. You can configure it globally or per channel
Configure the connector to manage the
Sets the (Kafka) deserializer to read the record’s value
Make sure that we can receive from more than one consumer (see
You don’t need to set the Kafka topic. By default, it uses the channel
prices). You can configure the
topic attribute to override it.
Then, your application receives
Message<Double>. You can consume the
Or, you can retrieve the
The deserialization is handled by the underlying Kafka Client. You need to configure the:
mp.messaging.incoming.[channel-name].value.deserializerto configure the value deserializer (mandatory)
mp.messaging.incoming.[channel-name].key.deserializerto configure the key deserializer (optional, default to
If you want to use a custom deserializer, add it to your
configure the associate attribute.
In addition, the Kafka Connector also provides a set of message converters. So you can receive payloads representing records from Kafka using:
- Record - a pair key/value
- a structure representing the record with all its metadata
Messages coming from Kafka contains an instance of IncomingKafkaRecordMetadata in the metadata. It provides the key, topic, partitions, headers and so on:
When a message produced from a Kafka record is acknowledged, the connector invokes a commit strategy. These strategies decide when the consumer offset for a specific topic/partition is committed. Committing an offset indicates that all previous records have been processed. It is also the position where the application would restart the processing after a crash recovery or a restart.
Committing every offset has performance penalties as Kafka offset management can be slow. However, not committing the offset often enough may lead to message duplication if the application crashes between two commits.
The Kafka connector supports three strategies:
throttledkeeps track of received messages and commit to the next offset after the latest acked message in sequence. This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified by
auto.commit.interval.ms(default: 5000)) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged in
throttled.unprocessed-record-max-age.ms(default: 60000). Indeed, this strategy cannot commit the offset as soon as a single record processing fails (see failure-strategy to configure what happens on failing processing). If
throttled.unprocessed-record-max-age.msis set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are poison pill messages. This strategy is the default if
enable.auto.commitis not explicitly set to
checkpointallows persisting consumer offsets on a "state store", instead of committing them back to the Kafka broker. Using the
CheckpointMetadataAPI, consumer code can persist a processing state with the offset to mark the progress of a consumer. When the processing continues from a previously persisted offset, it seeks the Kafka consumer to that offset and also restores the persisted state, continuing the stateful processing from where it left off. The
checkpointstrategy holds locally the processing state associated with the latest offset, and persists it periodically to the state store (period specified by
auto.commit.interval.ms(default: 5000)). The connector will be marked as unhealthy if no processing state is persisted to the state store in
checkpoint.unsynced-state-max-age.ms(default: 10000). Using the
CheckpointMetadataAPI the user code can force to persist the state on message ack. If
checkpoint.unsynced-state-max-age.msis set to less than or equal to 0, it does not perform any health check verification. For more information, see Stateful processing with Checkpointing
latestcommits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. This strategy should not be used on high-load as offset commit is expensive. However, it reduces the risk of duplicates.
ignoreperforms no commit. This strategy is the default strategy when the consumer is explicitly configured with
true. It delegates the offset commit to the Kafka client. When
truethis strategy DOES NOT guarantee at-least-once delivery. However, if the processing failed between two commits, messages received after the commit and before the failure will be re-processed.
The Kafka connector disables the Kafka auto commit if not explicitly enabled. This behavior differs from the traditional Kafka consumer.
If high-throughout is important for you, and not limited by the downstream, we recommend to either:
- Use the
- or set
trueand annotate the consuming method with
If a message produced from a Kafka record is nacked, a failure strategy is applied. The Kafka connector supports 3 strategies:
fail- fail the application, no more records will be processed. (default) The offset of the record that has not been processed correctly is not committed.
ignore- the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed.
dead-letter-queue- the offset of the record that has not been processed correctly is committed, but the record is written to a (Kafka) dead letter queue topic.
The strategy is selected using the
In the case of
dead-letter-queue, you can configure the following
dead-letter-queue.topic: the topic to use to write the records not processed correctly, default is
$channelbeing the name of the channel.
dead-letter-queue.producer-client-id: the client id used by the kafka producer when sending records to dead letter queue topic. If not specified it will default to
kafka-dead-letter-topic-producer-$client-id, with $client-id being the value obtained from consumer client id.
dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer.
dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.
The record written on the dead letter topic contains the original record’s headers, as well as a set of additional headers about the original record:
dead-letter-reason- the reason of the failure (the
dead-letter-cause- the cause of the failure (the
nack()), if any
dead-letter-topic- the original topic of the record
dead-letter-partition- the original partition of the record (integer mapped to String)
dead-letter-offset- the original offset of the record (long mapped to String)
dead-letter-queue, it is also possible to change some
metadata of the record that is sent to the dead letter topic. To do
that, use the
Message.nack(Throwable, Metadata) method:
Metadata may contain an instance of
If the instance is present, the following properties will be used:
key; if not present, the original record’s key will be used
topic; if not present, the configured dead letter topic will be used
partition; if not present, partition will be assigned automatically
headers; combined with the original record’s headers, as well as the
dead-letter-*headers described above
Custom commit and failure strategies
In addition to provided strategies, it is possible to implement custom commit and failure strategies and configure Kafka channels with them.
For example, for a custom commit strategy, implement the
and provide a managed bean implementing the
Finally, to use the custom commit strategy,
commit-strategy attribute to the identifier of the commit handler factory:
Similarly, custom failure strategies can be configured using
If the custom strategy implementation inherits ContextHolder class it can access the Vert.x event-loop context created for the Kafka consumer
You can combine Reactive Messaging with SmallRye Fault Tolerance, and retry processing when it fails:
You can configure the delay, the number of retries, the jitter...
If your method returns a
Uni, you need to add the
The incoming messages are acknowledged only once the processing completes successfully. So, it commits the offset after the successful processing. If after the retries the processing still failed, the message is nacked and the failure strategy is applied.
You can also use
@Retry on methods only consuming incoming messages:
Handling deserialization failures
Because deserialization happens before creating a
Message, the failure
strategy presented above cannot be applied. However, when a
deserialization failure occurs, you can intercept it and provide a
fallback value. To achieve this, create a CDI bean implementing the
The bean must be exposed with the
@Identifier qualifier specifying the
name of the bean. Then, in the connector configuration, specify the
mp.messaging.incoming.$channel.key-deserialization-failure-handler: name of the bean handling deserialization failures happening for the record’s key
mp.messaging.incoming.$channel.value-deserialization-failure-handler: name of the bean handling deserialization failures happening for the record’s value,
The handler is called with the deserialization action as a
record’s topic, a boolean indicating whether the failure happened on a
key, the class name of the deserializer that throws the exception, the
corrupted data, the exception, and the records headers augmented with
headers describing the failure (which ease the write to a dead letter).
On the deserialization
Uni failure strategies like retry, providing a
fallback value or applying timeout can be implemented. Note that the
method must await on the result and return the deserialized object.
Alternatively, the handler can only implement
handleDeserializationFailure method and provide a fallback value,
which may be
If you don’t configure a deserialization failure handlers and a
deserialization failure happens, the application is marked unhealthy.
You can also ignore the failure, which will log the exception and
null value. To enable this behavior, set the
Receiving Cloud Events
The Kafka connector supports Cloud Events.
When the connector detects a structured or binary Cloud Events, it
adds a IncomingKafkaCloudEventMetadata in the metadata of the
IncomingKafkaCloudEventMetadata contains the various (mandatory and optional) Cloud Event attributes.
If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.
Binary Cloud Events
binary Cloud Events, all mandatory Cloud Event attributes must
be set in the record header, prefixed by
ce_ (as mandated by the
The connector considers headers starting with the
ce_ prefix but not
listed in the specification as extensions. You can access them using the
getExtension method from
IncomingKafkaCloudEventMetadata. You can
retrieve them as
datacontenttype attribute is mapped to the
of the record. The
partitionkey attribute is mapped to the record’s
key, if any.
Note that all headers are read as UTF-8.
With binary Cloud Events, the record’s key and value can use any deserializer.
Structured Cloud Events
structured Cloud Events, the event is encoded in the record’s
value. Only JSON is supported, so your event must be encoded as JSON in
the record’s value.
Structured Cloud Event must set the
content-type header of the record
application/cloudevents or prefix the value with
application/cloudevents such as:
To receive structured Cloud Events, your value deserializer must be:
As mentioned previously, the value must be a valid JSON object containing at least all the mandatory Cloud Events attributes.
If the record is a structured Cloud Event, the created Message’s payload
is the Cloud Event
partitionkey attribute is mapped to the record’s key if any.
Consumer Rebalance Listener
To handle offset commit and assigned partitions yourself, you can
provide a consumer rebalance listener. To achieve this, implement the
interface, make the implementing class a bean, and add the
qualifier. A usual use case is to store offset in a separate data store
to implement exactly-once semantic, or starting the processing at a
The listener is invoked every time the consumer topic/partition
assignment changes. For example, when the application starts, it invokes
partitionsAssigned callback with the initial set of
topics/partitions associated with the consumer. If, later, this set
changes, it calls the
callbacks again, so you can implement custom logic.
Note that the rebalance listener methods are called from the Kafka polling thread and must block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.
When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and restarts once the rebalance completes.
If the rebalance listener handles offset commit on behalf of the user
ignore commit strategy), the rebalance listener must
commit the offset synchronously in the
partitionsRevoked callback. We
also recommend applying the same logic when the application stops.
ConsumerRebalanceListener from Apache Kafka, the
methods pass the Kafka
Consumer and the set of topics/partitions.
In this example we set-up a consumer that always starts on messages from
at most 10 minutes ago (or offset 0). First we need to provide a bean
that implements the
interface and is annotated with
@Identifier. We then must configure
our inbound connector to use this named bean.
To configure the inbound connector to use the provided listener we either set the consumer rebalance listener’s name:
Or have the listener’s name be the same as the group id:
Setting the consumer rebalance listener’s name takes precedence over using the group id.
Receiving Kafka Records in Batches
By default, incoming methods receive each Kafka record individually.
Under the hood, Kafka consumer clients poll the broker constantly and
receive records in batches, presented inside the
In batch mode, your application can receive all the records returned by the consumer poll in one go.
To achieve this you need to set
mp.messaging.incoming.$channel.batch=true and specify a compatible
container type to receive all the data:
The incoming method can also receive
ConsumerRecords<Key, Payload> types, They
give access to record details such as offset or timestamp :
Note that the successful processing of the incoming record batch will commit the latest offsets for each partition received inside the batch. The configured commit strategy will be applied for these records only.
Conversely, if the processing throws an exception, all messages are nacked, applying the failure strategy for all the records inside the batch.
Stateful processing with Checkpointing
Checkpointing is experimental, and APIs and features are subject to change in the future.
checkpoint commit strategy allows for a Kafka incoming channel to
manage topic-partition offsets, not by committing on the Kafka broker,
but by persisting consumers' advancement on a
In addition to that, if the consumer builds an internal state as a result of consumed records, the topic-partition offset persisted to the state store can be associated with a processing state, saving the local state to the persistent store. When a consumer restarts or consumer group instances scale, i.e. when new partitions get assigned to the consumer, the checkpointing works by resuming the processing from the latest offset and its saved state.
@Incoming channel consumer code can manipulate the processing
state through the
transform method allows applying a transformation function to
the current state, producing a changed state and registering it
locally for checkpointing. By default, the local state is synced
(persisted) to the state store periodically, period specified by
auto.commit.interval.ms, (default: 5000). If
is given, the latest state is persisted to the state store eagerly
on message acknowledgment. The
setNext method works similarly
directly setting the latest state.
checkpoint commit strategy tracks when a processing state
is last persisted for each topic-partition. If an outstanding state
change can not be persisted for
(default: 10000), the channel is marked unhealthy.
Where and how processing states are persisted is decided by the
state store implementation. This can be configured on the incoming
checkpoint.state-store configuration property,
using the state store factory identifier name.
The serialization of state objects depends on the state store
implementation. In order to instruct state stores for serialization
can require configuring the type name of state objects
In order to keep Smallrye Reactive Messaging free of persistence-related
dependencies, this library includes only a default state store named
It is based on Vert.x Filesystem API and stores the processing state
in Json formatted files, in a local directory configured by the
checkpoint.file.state-dir property. State files follow the naming
Implementing State Stores
State store implementations are required to implement
interface, and provide a managed bean implementing
CheckpointStateStore.Factory, identified with
qualifier indicating the name of the state-store.
The factory bean identifier indicates the name to configure on
checkpoint.state-store config property.
The factory is discovered as a CDI managed bean and state store is
created once per channel:
The checkpoint commit strategy calls the state store in following events:
fetchProcessingState: on partitions assigned, to seek the consumer to the latest offset.
persistProcessingStateon partitions revoked, to persist the state of last processed record.
persistProcessingStateon message acknowledgement, if a new state is set during the processing and
persistOnAckflag is set.
auto.commit.interval.msintervals, if a new state is set during processing.
persistProcessingStateon channel shutdown.
closeon channel shutdown.
|auto.offset.reset||What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none||string||false||
|batch||Whether the Kafka records are consumed in batch. The channel injection point must consume a compatible type, such as
|bootstrap.servers (kafka.bootstrap.servers)||A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.||string||false||
|broadcast||Whether the Kafka records should be dispatched to multiple consumer||boolean||false||
|checkpoint.state-store||While using the
|checkpoint.state-type||While using the
|checkpoint.unsynced-state-max-age.ms||While using the
|client-id-prefix||Prefix for Kafka client
|cloud-events||Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.||boolean||false||
|commit-strategy||Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be
|consumer-rebalance-listener.name||The name set in
|enable.auto.commit||If enabled, consumer's offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.||boolean||false||
|fail-on-deserialization-failure||When no deserialization failure handler is set and a deserialization failure happens, report the failure and mark the application as unhealthy. If set to
|failure-strategy||Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be
|fetch.min.bytes||The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.||int||false||
|graceful-shutdown||Whether or not a graceful shutdown should be attempted when the application terminates.||boolean||false||
|group.id||A unique string that identifies the consumer group the application belongs to. If not set, a unique, generated id is used||string||false|
|health-enabled||Whether health reporting is enabled (default) or disabled||boolean||false||
|health-readiness-enabled||Whether readiness health reporting is enabled (default) or disabled||boolean||false||
|health-readiness-timeout||deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.||long||false|
|health-readiness-topic-verification||deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.||boolean||false|
|health-topic-verification-enabled||Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.||boolean||false||
|health-topic-verification-timeout||During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.||long||false||
|kafka-configuration||Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map
|key-deserialization-failure-handler||The name set in
|key.deserializer||The deserializer classname used to deserialize the record's key||string||false||
|lazy-client||Whether Kafka client is created lazily or eagerly.||boolean||false||
|max-queue-size-factor||Multiplier factor to determine maximum number of records queued for processing, using
|partitions||The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic||int||false||
|pattern||Indicate that the
|pause-if-no-requests||Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.||boolean||false||
|poll-timeout||The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms||int||false||
|retry||Whether or not the connection to the broker is re-attempted in case of failure||boolean||false||
|retry-attempts||The maximum number of reconnection before failing. -1 means infinite retry||int||false||
|retry-max-wait||The max delay (in seconds) between 2 reconnects||int||false||
|throttled.unprocessed-record-max-age.ms||While using the
|topic||The consumed / populated Kafka topic. If neither this property nor the
|topics||A comma-separating list of topics to be consumed. Cannot be used with the
|tracing-enabled||Whether tracing is enabled (default) or disabled||boolean||false||
|value-deserialization-failure-handler||The name set in
|value.deserializer||The deserializer classname used to deserialize the record's value||string||true|
You can also pass any property supported by the underlying Kafka consumer.
For example, to configure the
max.poll.records property, use:
Some consumer client properties are configured to sensible default values:
If not set,
reconnect.backoff.max.ms is set to
10000 to avoid high
load on disconnection.
If not set,
key.deserializer is set to
client.id is configured according to the number of
clients to create using
client.idis provided, it is used as-is or suffixed with client index if
partitionsproperty is set.
client.idis not provided, it is generated as