Writing Kafka Records
The Kafka Connector can write Reactive Messaging Messages
as Kafka
Records.
Example
Let’s imagine you have a Kafka broker running, and accessible using the
kafka:9092
address (by default it would use localhost:9092
).
Configure your application to write the messages from the prices
channel into a Kafka topic as follows:
prices
channel
3. Sets the (Kafka) serializer to encode the message payload into the
record’s value
4. Make sure the topic name is prices
(and not the default
prices-out
)
Then, your application must send Message<Double>
to the prices
channel. It can use double
payloads as in the following snippet:
Or, you can send Message<Double>
:
Serialization
The serialization is handled by the underlying Kafka Client. You need to configure the:
-
mp.messaging.outgoing.[channel-name].value.serializer
to configure the value serializer (mandatory) -
mp.messaging.outgoing.[channel-name].key.serializer
to configure the key serializer (optional, default toString
)
If you want to use a custom serializer, add it to your CLASSPATH
and
configure the associate attribute.
By default, the written record contains:
-
the
Message
payload as value -
no key, or the key configured using the
key
attribute or the key passed in the metadata attached to theMessage
-
the timestamp computed for the system clock (
now
) or the timestamp passed in the metadata attached to theMessage
Sending key/value pairs
In the Kafka world, it’s often necessary to send records, i.e. a key/value pair. The connector provides the io.smallrye.reactive.messaging.kafka.Record class that you can use to send a pair:
When the connector receives a message with a Record
payload, it
extracts the key and value from it. The configured serializers for the
key and the value must be compatible with the record’s key and value.
Note that the key
and the value
can be null
. It is also possible
to create a record with a null
key AND a null
value.
If you need more control on the written records, use
OutgoingKafkaRecordMetadata
.
Outbound Metadata
When sending Messages
, you can add an instance of
OutgoingKafkaRecordMetadata
to influence how the message is going to be written to Kafka. For example,
you can add Kafka headers, configure the record key…
Propagating Record Key
When processing messages, you can propagate incoming record key to the outgoing record.
Consider the following example method, which consumes messages from the
channel in
, transforms the payload, and writes the result to the
channel out
.
Enabled with
mp.messaging.outgoing.[channel-name].propagate-record-key=true
configuration, record key propagation produces the outgoing record with
the same key as the incoming record.
If the outgoing record already contains a key, it won’t be
overridden by the incoming record key. If the incoming record does
have a null key, the mp.messaging.outgoing.[channel-name].key
property is used.
Propagating Record headers
You can also propagate incoming record headers to the outgoing record, by specifying the list of headers to be considered.
mp.messaging.outgoing.[channel-name].propagate-headers=Authorization,Proxy-Authorization
If the ougoing record already defines a header with the same key, it won't be overriden by the incoming header.
Dynamic topic names
Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the topic inside your application configuration file, but instead, use the outbound metadata to set the name of the topic.
For example, you can route to a dynamic topic based on the incoming message:
Acknowledgement
Kafka acknowledgement can take times depending on the configuration. Also, it stores in-memory the records that cannot be written.
By default, the connector does wait for Kafka to acknowledge the record
to continue the processing (acknowledging the received Message
). You
can disable this by setting the waitForWriteCompletion
attribute to
false
.
Note that the acks
attribute has a huge impact on the record
acknowledgement.
If a record cannot be written, the message is nacked
.
Back-pressure and inflight records
The Kafka outbound connector handles back-pressure monitoring the number
of in-flight messages waiting to be written to the Kafka broker. The
number of in-flight messages is configured using the
max-inflight-messages
attribute and defaults to 1024.
The connector only sends that amount of messages concurrently. No other
messages will be sent until at least one in-flight message gets
acknowledged by the broker. Then, the connector writes a new message to
Kafka when one of the broker’s in-flight messages get acknowledged. Be
sure to configure Kafka’s batch.size
and linger.ms
accordingly.
You can also remove the limit of inflight messages by setting
max-inflight-messages
to 0
. However, note that the Kafka Producer
may block if the number of requests reaches
max.in.flight.requests.per.connection
.
Handling serialization failures
For Kafka producer client serialization failures are not recoverable, thus the message dispatch is not retried. However, using schema registries, serialization may intermittently fail to contact the registry. In these cases you may need to apply a failure strategy for the serializer. To achieve this, create a CDI bean implementing the SerializationFailureHandler interface:
The bean must be exposed with the @Identifier
qualifier specifying the
name of the bean. Then, in the connector configuration, specify the
following attribute:
-
mp.messaging.incoming.$channel.key-serialization-failure-handler
: name of the bean handling serialization failures happening for the record’s key -
mp.messaging.incoming.$channel.value-serialization-failure-handler
: name of the bean handling serialization failures happening for the record’s value,
The handler is called with the serialization action as a Uni
, the
record’s topic, a boolean indicating whether the failure happened on a
key, the class name of the deserializer that throws the exception, the
corrupted data, the exception, and the records headers. Failure
strategies like retry, providing a fallback value or applying timeout
can be implemented. Note that the method must await on the result and
return the serialized byte array. Alternatively, the handler can
implement decorateSerialization
method which can return a fallback
value.
Sending Cloud Events
The Kafka connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:
-
the message metadata contains an
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
instance, -
the channel configuration defines the
cloud-events-type
andcloud-events-source
attributes.
You can create
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
instances
using:
If the metadata does not contain an id, the connector generates one
(random UUID). The type
and source
can be configured per message or
at the channel level using the cloud-events-type
and
cloud-events-source
attributes. Other attributes are also
configurable.
The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:
By default, the connector sends the Cloud Events using the binary
format. You can write structured Cloud Events by setting the
cloud-events-mode
to structured
. Only JSON is supported, so the
created records had its content-type
header set to
application/cloudevents+json; charset=UTF-8
When using the
structured mode, the value serializer must be set to
org.apache.kafka.common.serialization.StringSerializer
, otherwise the
connector reports the error. In addition, in structured, the connector
maps the message’s payload to JSON, except for String
passed directly.
The record’s key can be set in the channel configuration (key
attribute), in the OutgoingKafkaRecordMetadata
or using the
partitionkey
Cloud Event attribute.
Note
you can disable the Cloud Event support by setting the cloud-events
attribute to false
Using ProducerRecord
Kafka built-in type
ProducerRecord\
Warning
This is an advanced feature. The ProducerRecord
is sent to Kafka as
is. Any possible metadata attached through Message<ProducerRecord<K, V>>
are ignored and lost.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all | string | false | 1 |
bootstrap.servers (kafka.bootstrap.servers) | A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster. | string | false | localhost:9092 |
buffer.memory | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. | long | false | 33554432 |
close-timeout | The amount of milliseconds waiting for a graceful shutdown of the Kafka producer | int | false | 10000 |
cloud-events | Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. | boolean | false | true |
cloud-events-data-content-type (cloud-events-default-data-content-type) | Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the datacontenttype attribute itself |
string | false | |
cloud-events-data-schema (cloud-events-default-data-schema) | Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the dataschema attribute itself |
string | false | |
cloud-events-insert-timestamp (cloud-events-default-timestamp) | Whether or not the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the time attribute itself |
boolean | false | true |
cloud-events-mode | The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record |
string | false | binary |
cloud-events-source (cloud-events-default-source) | Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the source attribute itself |
string | false | |
cloud-events-subject (cloud-events-default-subject) | Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the subject attribute itself |
string | false | |
cloud-events-type (cloud-events-default-type) | Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true . This value is used if the message does not configure the type attribute itself |
string | false | |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
health-readiness-enabled | Whether readiness health reporting is enabled (default) or disabled | boolean | false | true |
health-readiness-timeout | deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead. | long | false | |
health-readiness-topic-verification | deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead. | boolean | false | |
health-topic-verification-enabled | Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection. | boolean | false | false |
health-topic-verification-timeout | During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. | long | false | 2000 |
kafka-configuration | Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map |
string | false | |
key | A key to used when writing the record | string | false | |
key-serialization-failure-handler | The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler . If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value. |
string | false | |
key.serializer | The serializer classname used to serialize the record's key | string | false | org.apache.kafka.common.serialization.StringSerializer |
max-inflight-messages | The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to 0 remove the limit |
long | false | 1024 |
merge | Whether the connector should allow multiple upstreams | boolean | false | false |
partition | The target partition id. -1 to let the client determine the partition | int | false | -1 |
propagate-headers | A comma-separating list of incoming record headers to be propagated to the outgoing record | string | false | `` |
propagate-record-key | Propagate incoming record key to the outgoing record | boolean | false | false |
retries | If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by delivery.timeout.ms . |
long | false | 2147483647 |
topic | The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used |
string | false | |
tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
value-serialization-failure-handler | The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler . If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value. |
string | false | |
value.serializer | The serializer classname used to serialize the payload | string | true | |
waitForWriteCompletion | Whether the client waits for Kafka to acknowledge the written record before acknowledging the message | boolean | false | true |
You can also pass any property supported by the underlying Kafka producer.
For example, to configure the batch.size
property, use:
Some producer client properties are configured to sensible default values:
If not set, reconnect.backoff.max.ms
is set to 10000
to avoid high
load on disconnection.
If not set, key.serializer
is set to
org.apache.kafka.common.serialization.StringSerializer
.
If not set, producer client.id
is generated as
kafka-producer-[channel]
.