Apache Kafka
The Kafka connector adds support for Kafka to Reactive Messaging.
With it you can receive Kafka Records as well as write message into Kafka.
The Kafka Connector is based on the Vert.x Kafka Client.
Introduction
Apache Kafka is a popular distributed streaming platform. It lets you:
-
Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
-
Store streams of records in a fault-tolerant durable way.
-
Process streams of records as they occur.
The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp.
For more details about Kafka, check the documentation.
Using the Kafka Connector
To use the Kafka Connector, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
<version>3.6.0</version>
</dependency>
The connector name is: smallrye-kafka.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-kafka
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-kafka
Receiving Kafka Records
The Kafka Connector retrieves Kafka Records from Kafka Brokers and maps each of them to Reactive Messaging Messages.
Example
Let’s imagine you have a Kafka broker running, and accessible using the kafka:9092 address (by default it would use localhost:9092).
Configure your application to receive Kafka records from a Kafka topic on the prices channel as follows:
kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer (3)
mp.messaging.incoming.prices.broadcast=true (4)
-
Configure the broker location. You can configure it globally or per channel
-
Configure the connector to manage the
priceschannel -
Sets the (Kafka) deserializer to read the record’s value
-
Make sure that we can receive from more that one consumer (see
KafkaPriceConsumerandKafkaPriceMessageConsumerbelow)
You don’t need to set the Kafka topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it.
|
Then, your application receives Message<Double>.
You can consume the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class KafkaPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
Or, you can retrieve the Message<Double>:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class KafkaPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message (commit the offset)
return price.ack();
}
}
Deserialization
The deserialization is handled by the underlying Kafka Client. You need to configure the:
-
mp.messaging.incoming.[channel-name].value.deserializerto configure the value deserializer (mandatory) -
mp.messaging.incoming.[channel-name].key.deserializerto configure the key deserializer (optional, default toString)
If you want to use a custom deserializer, add it to your CLASSPATH and configure the associate attribute.
In addition, the Kafka Connector also provides a set of message converters. So you can receive payloads representing records from Kafka using:
-
Record<K,V> - a pair key/value
-
ConsumerRecord<K,V> - a structure representing the record with all its metadata
@Incoming("topic-a")
public void consume(Record<String, String> record) {
String key = record.key(); // Can be `null` if the incoming record has no key
String value = record.value(); // Can be `null` if the incoming record has no value
}
@Incoming("topic-b")
public void consume(ConsumerRecord<String, String> record) {
String key = record.key(); // Can be `null` if the incoming record has no key
String value = record.value(); // Can be `null` if the incoming record has no value
String topic = record.topic();
int partition = record.partition();
// ...
}
Inbound Metadata
Messages coming from Kafka contains an instance of IncomingKafkaRecordMetadata<K, T> in the metadata.
K is the type of the record’s key.
T is the type of the record’s value.
It provides the key, topic, partitions, headers and so on:
IncomingKafkaRecordMetadata<String, Double> metadata = incoming.getMetadata(IncomingKafkaRecordMetadata.class)
.orElse(null);
if (metadata != null) {
// The topic
String topic = metadata.getTopic();
// The key
String key = metadata.getKey();
// The timestamp
Instant timestamp = metadata.getTimestamp();
// The underlying record
ConsumerRecord<String, Double> record = metadata.getRecord();
// ...
}
Acknowledgement
When a message produced from a Kafka record is acknowledged, the connector invokes a commit strategy. These strategies decide when the consumer offset for a specific topic/partition is committed. Committing an offset indicates that all previous records have been processed. It is also the position where the application would restart the processing after a crash recovery or a restart.
Committing every offset has performance penalties as Kafka offset management can be slow. However, not committing the offset often enough may lead to message duplication if the application crashes between two commits.
The Kafka connector supports three strategies:
-
throttledkeeps track of received messages and commit to the next offset after the latest acked message in sequence. This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified byauto.commit.interval.ms(default: 5000)) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged inthrottled.unprocessed-record-max-age.ms(default: 60000). Indeed, this strategy cannot commit the offset as soon as a single record processing fails (see failure-strategy to configure what happens on failing processing). Ifthrottled.unprocessed-record-max-age.msis set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are poison pill messages. This strategy is the default ifenable.auto.commitis not explicitly set totrue. -
latestcommits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. This strategy should not be used on high-load as offset commit is expensive. However, it reduces the risk of duplicates. -
ignoreperforms no commit. This strategy is the default strategy when the consumer is explicitly configured withenable.auto.committotrue. It delegates the offset commit to the Kafka client. This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous operations and whenenable.auto.commitis set totrue. However, if the processing failed between two commits, messages received after the commit and before the failure will be re-processed.
| The Kafka connector disables the Kafka auto commit is not explicitly enabled. This behavior differs from the traditional Kafka consumer. |
If high-throughout is important for you, and not limited by the downstream, we recommend to either:
-
Use the
throttledpolicy -
or set
enable.auto.committotrueand annotate the consuming method with@Acknowledgment(Acknowledgment.Strategy.NONE)
Failure Management
If a message produced from a Kafka record is nacked, a failure strategy is applied. The Kafka connector supports 3 strategies:
-
fail- fail the application, no more records will be processed. (default) The offset of the record that has not been processed correctly is not committed. -
ignore- the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed. -
dead-letter-queue- the offset of the record that has not been processed correctly is committed, but the record is written to a (Kafka) dead letter queue topic.
The strategy is selected using the failure-strategy attribute.
In the case of dead-letter-queue, you can configure the following attributes:
-
dead-letter-queue.topic: the topic to use to write the records not processed correctly, default isdead-letter-topic-$channel, with$channelbeing the name of the channel. -
dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer. -
dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.
The record written on the dead letter topic contains the original record’s headers, as well as a set of additional headers about the original record:
-
dead-letter-reason- the reason of the failure (theThrowablepassed tonack()) -
dead-letter-cause- the cause of the failure (thegetCause()of theThrowablepassed tonack()), if any -
dead-letter-topic- the original topic of the record -
dead-letter-partition- the original partition of the record (integer mapped to String) -
dead-letter-offset- the original offset of the record (long mapped to String)
When using dead-letter-queue, it is also possible to change some metadata of the record that is sent to the dead letter topic.
To do that, use the Message.nack(Throwable, Metadata) method:
@Incoming("in")
public CompletionStage<Void> consume(KafkaRecord<String, String> message) {
return message.nack(new Exception("Failed!"), Metadata.of(
OutgoingKafkaRecordMetadata.builder()
.withKey("failed-record")
.withHeaders(new RecordHeaders()
.add("my-header", "my-header-value".getBytes(StandardCharsets.UTF_8))
)
));
}
The Metadata may contain an instance of OutgoingKafkaRecordMetadata.
If the instance is present, the following properties will be used:
-
key; if not present, the original record’s key will be used
-
topic; if not present, the configured dead letter topic will be used
-
partition; if not present, partition will be assigned automatically
-
headers; combined with the original record’s headers, as well as the
dead-letter-*headers described above
Handling deserialization failures
Because deserialization happens before creating a Message, the failure strategy presented above cannot be applied.
However, when a deserialization failure occurs, you can intercept it and provide a fallback value.
If you don’t, null will be used as fallback value.
To achieve this, create a CDI bean implementing the DeserializationFailureHandler<T> interface:
@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MyDeserializationFailureHandler
implements DeserializationFailureHandler<JsonObject> { // Specify the expected type
@Override
public JsonObject handleDeserializationFailure(String topic, boolean isKey,
String deserializer, byte[] data,
Exception exception, Headers headers) {
return fallback;
}
}
The bean must be exposed with the @Identifier qualifier specifying the name of the bean.
Then, in the connector configuration, specify the following attribute:
-
mp.messaging.incoming.$channel.key-deserialization-failure-handler: name of the bean handling deserialization failures happening for the record’s key -
mp.messaging.incoming.$channel.value-deserialization-failure-handler: name of the bean handling deserialization failures happening for the record’s value,
The handler is called with the record’s topic, a boolean indicating whether the failure happened on a key, the class name of the deserializer that throws the exception, the corrupted data, the exception, and the records headers augmented with headers describing the failure (which ease the write to a dead letter).
The handler can return null (which would be as if there were no handlers).
However, if the handler throws an exception, the application would be marked unhealthy.
Receiving Cloud Events
The Kafka connector supports Cloud Events.
When the connector detects a structured or binary Cloud Events, it adds a IncomingKafkaCloudEventMetadata<K, T> in the metadata of the Message.
IncomingKafkaCloudEventMetadata contains the various (mandatory and optional) Cloud Event attributes.
If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.
Binary Cloud Events
For binary Cloud Events, all mandatory Cloud Event attributes must be set in the record header, prefixed by ce_ (as mandated by the protocol binding).
The connector considers headers starting with the ce_ prefix but not listed in the specification as extensions.
You can access them using the getExtension method from IncomingKafkaCloudEventMetadata.
You can retrieve them as String.
The datacontenttype attribute is mapped to the content-type header of the record.
The partitionkey attribute is mapped to the record’s key, if any.
Note that all headers are read as UTF-8.
With binary Cloud Events, the record’s key and value can use any deserializer.
Structured Cloud Events
For structured Cloud Events, the event is encoded in the record’s value.
Only JSON is supported, so your event must be encoded as JSON in the record’s value.
Structured Cloud Event must set the content-type header of the record to application/cloudevents or prefix the value with application/cloudevents such as: application/cloudevents+json; charset=UTF-8.
To receive structured Cloud Events, your value deserializer must be:
-
org.apache.kafka.common.serialization.StringDeserializer -
org.apache.kafka.common.serialization.ByteArrayDeserializer -
io.vertx.kafka.client.serialization.JsonObjectDeserializer
As mentioned previously, the value must be a valid JSON object containing at least all the mandatory Cloud Events attributes.
If the record is a structured Cloud Event, the created Message’s payload is the Cloud Event data.
The partitionkey attribute is mapped to the record’s key if any.
Configuration Reference
| Attribute (alias) | Description | Mandatory | Default |
|---|---|---|---|
bootstrap.servers (kafka.bootstrap.servers) |
A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster. Type: string |
false |
|
topic |
The consumed / populated Kafka topic. If neither this property nor the Type: string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled Type: boolean |
false |
|
health-readiness-enabled |
Whether readiness health reporting is enabled (default) or disabled Type: boolean |
false |
|
health-readiness-topic-verification |
Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Type: boolean |
false |
|
health-readiness-timeout |
During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Type: long |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
topics |
A comma-separating list of topics to be consumed. Cannot be used with the Type: string |
false |
|
pattern |
Indicate that the Type: boolean |
false |
|
key.deserializer |
The deserializer classname used to deserialize the record’s key Type: string |
false |
|
value.deserializer |
The deserializer classname used to deserialize the record’s value Type: string |
true |
|
fetch.min.bytes |
The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Type: int |
false |
|
group.id |
A unique string that identifies the consumer group the application belongs to. If not set, a unique, generated id is used Type: string |
false |
|
enable.auto.commit |
If enabled, consumer’s offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit. Type: boolean |
false |
|
retry |
Whether or not the connection to the broker is re-attempted in case of failure Type: boolean |
false |
|
retry-attempts |
The maximum number of reconnection before failing. -1 means infinite retry Type: int |
false |
|
retry-max-wait |
The max delay (in seconds) between 2 reconnects Type: int |
false |
|
broadcast |
Whether the Kafka records should be dispatched to multiple consumer Type: boolean |
false |
|
auto.offset.reset |
What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none Type: string |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be Type: string |
false |
|
commit-strategy |
Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be Type: string |
false |
|
throttled.unprocessed-record-max-age.ms |
While using the Type: int |
false |
|
dead-letter-queue.topic |
When the Type: string |
false |
|
dead-letter-queue.key.serializer |
When the Type: string |
false |
|
dead-letter-queue.value.serializer |
When the Type: string |
false |
|
partitions |
The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic Type: int |
false |
|
consumer-rebalance-listener.name |
The name set in Type: string |
false |
|
key-deserialization-failure-handler |
The name set in Type: string |
false |
|
value-deserialization-failure-handler |
The name set in Type: string |
false |
|
graceful-shutdown |
Whether or not a graceful shutdown should be attempted when the application terminates. Type: boolean |
false |
|
poll-timeout |
The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms Type: int |
false |
|
pause-if-no-requests |
Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. Type: boolean |
false |
|
You can also pass any property supported by the Vert.x Kafka client as attribute.
Consumer Rebalance Listener
To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener.
To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface, make the implementing class a bean, and add the @Identifier qualifier.
A usual use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.
The listener is invoked every time the consumer topic/partition assignment changes.
For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer.
If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.
Note that the rebalance listener methods are called from the Kafka polling thread and must block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.
When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and restarts once the rebalance completes.
If the rebalance listener handles offset commit on behalf of the user (using the ignore commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback.
We also recommend applying the same logic when the application stops.
Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.
Example
In this example we set-up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide
a bean that implements the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and is annotated with
@Identifier. We then must configure our inbound connector to use this named bean.
package inbound;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import javax.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {
private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());
/**
* When receiving a list of partitions will search for the earliest offset within 10 minutes
* and seek the consumer to it.
*
* @param consumer underlying consumer
* @param partitions set of assigned topic partitions
*/
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions) {
long now = System.currentTimeMillis();
long shouldStartAt = now - 600_000L; //10 minute ago
Map<org.apache.kafka.common.TopicPartition, Long> request = new HashMap<>();
for (org.apache.kafka.common.TopicPartition partition : partitions) {
LOGGER.info("Assigned " + partition);
request.put(partition, shouldStartAt);
}
Map<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> offsets = consumer
.offsetsForTimes(request);
for (Map.Entry<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
long target = position.getValue() == null ? 0L : position.getValue().offset();
LOGGER.info("Seeking position " + target + " for " + position.getKey());
consumer.seek(position.getKey(), target);
}
}
}
package inbound;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class KafkaRebalancedConsumer {
@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
// We don't need to ACK messages because in this example we set offset during consumer re-balance
return CompletableFuture.completedFuture(null);
}
}
To configure the inbound connector to use the provided listener we either set the consumer rebalance listener’s name:
-
mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer
Or have the listener’s name be the same as the group id:
-
mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer
Setting the consumer rebalance listener’s name takes precedence over using the group id.
Writing Kafka Records
The Kafka Connector can write Reactive Messaging Messages as Kafka Records.
Example
Let’s imagine you have a Kafka broker running, and accessible using the kafka:9092 address (by default it would use localhost:9092).
Configure your application to write the messages from the prices channel into a Kafka topic as follows:
kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.value.serializer=org.apache.kafka.common.serialization.DoubleSerializer (3)
mp.messaging.outgoing.prices-out.topic=prices (4)
-
Configure the broker location. You can configure it globally or per channel
-
Configure the connector to manage the
priceschannel -
Sets the (Kafka) serializer to encode the message payload into the record’s value
-
Make sure the topic name is
prices(and not the defaultprices-out)
Then, your application must send Message<Double> to the prices channel.
It can use double payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class KafkaPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
Or, you can send Message<Double>:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class KafkaPriceMessageProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Message<Double>> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble()));
}
}
Serialization
The serialization is handled by the underlying Kafka Client. You need to configure the:
-
mp.messaging.outgoing.[channel-name].value.serializerto configure the value serializer (mandatory) -
mp.messaging.outgoing.[channel-name].key.serializerto configure the key serializer (optional, default toString)
If you want to use a custom serializer, add it to your CLASSPATH and configure the associate attribute.
By default, the written record contains:
-
the
Messagepayload as value -
no key, or the key configured using the
keyattribute or the key passed in the metadata attached to theMessage -
the timestamp computed for the system clock (
now) or the timestamp passed in the metadata attached to theMessage
Sending key/value pairs
In the Kafka world, it’s often necessary to send records, i.e. a key/value pair.
The connector provides the io.smallrye.reactive.messaging.kafka.Record class that you can use to send a pair:
@Incoming("in")
@Outgoing("out")
public Record<String, String> process(String in) {
return Record.of("my-key", in);
}
When the connector receives a message with a Record payload, it extracts the key and value from it.
The configured serializers for the key and the value must be compatible with the record’s key and value.
Note that the key and the value can be null.
It is also possible to create a record with a null key AND a null value.
If you need more control on the written records, use OutgoingKafkaRecordMetadata.
Outbound Metadata
When sending Messages, you can add an instance of OutgoingKafkaRecordMetadata to influence how the message is going to written to Kafka.
For example, you can add Kafka headers, configure the record key…
// Creates an OutgoingKafkaRecordMetadata
// The type parameter is the type of the record's key
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
.withKey("my-key")
.withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
Dynamic topic names
Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the topic inside your application configuration file, but instead, use the outbound metadata to set the name of the topic.
For example, you can route to a dynamic topic based on the incoming message:
String topicName = selectTopicFromIncommingMessage(incoming);
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
.withTopic(topicName)
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
Acknowledgement
Kafka acknowledgement can take times depending on the configuration. Also, it stores in-memory the records that cannot be written.
By default, the connector does wait for Kafka to acknowledge the record to continue the processing (acknowledging the received Message).
You can disable this by setting the waitForWriteCompletion attribute to false.
Note that the acks attribute has a huge impact on the record acknowledgement.
If a record cannot be written, the message is nacked.
Back-pressure and inflight records
The Kafka outbound connector handles back-pressure monitoring the number of in-flight messages waiting to be written to the Kafka broker.
The number of in-flight messages is configured using the max-inflight-messages attribute and defaults to 1024.
The connector only sends that amount of messages concurrently.
No other messages will be sent until at least one in-flight message gets acknowledged by the broker.
Then, the connector writes a new message to Kafka when one of the broker’s in-flight messages get acknowledged.
Be sure to configure Kafka’s batch.size and linger.ms accordingly.
You can also remove the limit of inflight messages by setting max-inflight-messages to 0.
However, note that the Kafka Producer may block if the number of requests reaches max.in.flight.requests.per.connection.
Sending Cloud Events
The Kafka connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:
-
the message metadata contains an
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadatainstance, -
the channel configuration defines the
cloud-events-typeandcloud-events-sourceattribute.
You can create io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instances using:
package outbound;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.net.URI;
@ApplicationScoped
public class KafkaCloudEventProcessor {
@Outgoing("cloud-events")
public Message<String> toCloudEvents(Message<String> in) {
return in.addMetadata(OutgoingCloudEventMetadata.builder()
.withId("id-" + in.getPayload())
.withType("greetings")
.withSource(URI.create("http://example.com"))
.withSubject("greeting-message")
.build());
}
}
If the metadata does not contain an id, the connector generates one (random UUID).
The type and source can be configured per message or at the channel level using the cloud-events-type and cloud-events-source attributes.
Other attributes are also configurable.
The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:
package outbound;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.net.URI;
@ApplicationScoped
public class KafkaCloudEventMultipleProcessors {
@Incoming("source")
@Outgoing("processed")
public Message<String> process(Message<String> in) {
return in.addMetadata(OutgoingCloudEventMetadata.builder()
.withId("id-" + in.getPayload())
.withType("greeting")
.build());
}
@SuppressWarnings("unchecked")
@Incoming("processed")
@Outgoing("cloud-events")
public Message<String> process2(Message<String> in) {
OutgoingCloudEventMetadata<String> metadata = in
.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
return in.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withSource(URI.create("source://me"))
.withSubject("test")
.build());
}
}
By default, the connector sends the Cloud Events using the binary format.
You can write structured Cloud Events by setting the cloud-events-mode to structured.
Only JSON is supported, so the created records had it’s content-type header set to application/cloudevents+json; charset=UTF-8
When using the structured mode, the value serializer must be set to org.apache.kafka.common.serialization.StringSerializer, otherwise the connector reports the error.
In addition, in structured, the connector maps the message’s payload to JSON, except for String passed directly.
The record’s key can be set in the channel configuration (key attribute), in the OutgoingKafkaRecordMetadata or using the partitionkey Cloud Event attribute.
you can disable the Cloud Event support by setting the cloud-events attribute to false
|
Using ProducerRecord
Kafka built-in type ProducerRecord<K,V> can also be used for producing messages:
@Outgoing("out")
public ProducerRecord<String, String> generate() {
return new ProducerRecord<>("my-topic", "key", "value");
}
This is an advanced feature.
The ProducerRecord is sent to Kafka as is.
Any possible metadata attached through Message<ProducerRecord<K, V>> are ignored and lost.
|
Configuration Reference
| Attribute (alias) | Description | Mandatory | Default |
|---|---|---|---|
acks |
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all Type: string |
false |
|
bootstrap.servers (kafka.bootstrap.servers) |
A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster. Type: string |
false |
|
buffer.memory |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. Type: long |
false |
|
close-timeout |
The amount of milliseconds waiting for a graceful shutdown of the Kafka producer Type: int |
false |
|
cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
cloud-events-data-content-type (cloud-events-default-data-content-type) |
Configure the default Type: string |
false |
|
cloud-events-data-schema (cloud-events-default-data-schema) |
Configure the default Type: string |
false |
|
cloud-events-insert-timestamp (cloud-events-default-timestamp) |
Whether or not the connector should insert automatically the Type: boolean |
false |
|
cloud-events-mode |
The Cloud Event mode ( Type: string |
false |
|
cloud-events-source (cloud-events-default-source) |
Configure the default Type: string |
false |
|
cloud-events-subject (cloud-events-default-subject) |
Configure the default Type: string |
false |
|
cloud-events-type (cloud-events-default-type) |
Configure the default Type: string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled Type: boolean |
false |
|
health-readiness-enabled |
Whether readiness health reporting is enabled (default) or disabled Type: boolean |
false |
|
health-readiness-timeout |
During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Type: long |
false |
|
health-readiness-topic-verification |
Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Type: boolean |
false |
|
key |
A key to used when writing the record Type: string |
false |
|
key.serializer |
The serializer classname used to serialize the record’s key Type: string |
false |
|
max-inflight-messages |
The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to Type: long |
false |
|
merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
partition |
The target partition id. -1 to let the client determine the partition Type: int |
false |
|
retries |
If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by Type: long |
false |
|
topic |
The consumed / populated Kafka topic. If neither this property nor the Type: string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
value.serializer |
The serializer classname used to serialize the payload Type: string |
true |
|
waitForWriteCompletion |
Whether the client waits for Kafka to acknowledge the written record before acknowledging the message Type: boolean |
false |
|
You can also pass any property supported by the Vert.x Kafka client.
Retrieving Kafka default configuration
If your application/runtime exposes as a CDI bean of type Map<String, Object with the identifier default-kafka-broker, this configuration is used to
establish the connection with the Kafka broker.
For example, you can imagine exposing this map as follows:
@Produces
@ApplicationScoped
@Identifier("default-kafka-broker")
public Map<String, Object> createKafkaRuntimeConfig() {
Map<String, Object> properties = new HashMap<>();
StreamSupport
.stream(config.getPropertyNames().spliterator(), false)
.map(String::toLowerCase)
.filter(name -> name.startsWith("kafka"))
.distinct()
.sorted()
.forEach(name -> {
final String key = name.substring("kafka".length() + 1).toLowerCase().replaceAll("[^a-z0-9.]", ".");
final String value = config.getOptionalValue(name, String.class).orElse("");
properties.put(key, value);
});
return properties;
}
This previous example would extract all the configuration keys from MicroProfile Config starting with kafka.
|
Quarkus
Starting with Quarkus 1.5, a map corresponding to the previous example is automatically provided. |
Using Apache Avro serializer/deserializer
If you are using Apache Avro serializer/deserializer, please note the following configuration properties.
For Confluent Schema Registry
Confluent Avro library is io.confluent:kafka-avro-serializer.
Note that this library is not available in Maven Central, you need to use the Confluent Maven repository.
Consumer
| Property | Recommended value |
|---|---|
value.deserializer |
io.confluent.kafka.serializers.KafkaAvroDeserializer |
schema.registry.url |
|
specific.avro.reader |
true |
Example:
mp.messaging.incoming.[channel].value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.incoming.[channel].schema.registry.url=http://<your_host>:<your_port>/
mp.messaging.incoming.[channel].specific.avro.reader=true
Producer
| Property | Recommended value |
|---|---|
value.serializer |
io.confluent.kafka.serializers.KafkaAvroSerializer |
schema.registry.url |
Example:
mp.messaging.outgoing.[channel].value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
mp.messaging.outgoing.[channel].schema.registry.url=http://<your_host>:<your_port>/
For Apicurio Registry 1.x
Apicurio Registry 1.x Avro library is io.apicurio:apicurio-registry-utils-serde.
The configuration properties listed here are meant to be used with the Apicurio Registry 1.x client library and Apicurio Registry 1.x server.
Consumer
| Property | Recommended value |
|---|---|
value.deserializer |
io.apicurio.registry.utils.serde.AvroKafkaDeserializer |
apicurio.registry.url |
|
apicurio.registry.avro-datum-provider |
io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider |
apicurio.registry.use-specific-avro-reader |
true |
Example:
mp.messaging.incoming.[channel].value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.[channel].apicurio.registry.url=http://<your_host>:<your_port>/api
mp.messaging.incoming.[channel].apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider
mp.messaging.incoming.[channel].apicurio.registry.use-specific-avro-reader=true
Producer
| Property | Recommended value |
|---|---|
value.serializer |
io.apicurio.registry.utils.serde.AvroKafkaSerializer |
apicurio.registry.url |
To automatically register schemas with the registry, add:
| Property | Value |
|---|---|
apicurio.registry.global-id |
io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy |
Example:
mp.messaging.outgoing.[channel].value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.[channel].apicurio.registry.url=http://<your_host>:<your_port>/api
mp.messaging.outgoing.[channel].apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
For Apicurio Registry 2.x
Apicurio Registry 2.x Avro library is io.apicurio:apicurio-registry-serdes-avro-serde.
The configuration properties listed here are meant to be used with the Apicurio Registry 2.x client library and Apicurio Registry 2.x server.
Consumer
| Property | Recommended value |
|---|---|
value.deserializer |
io.apicurio.registry.serde.avro.AvroKafkaDeserializer |
apicurio.registry.url |
|
apicurio.registry.use-specific-avro-reader |
true |
Example:
mp.messaging.incoming.[channel].value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
mp.messaging.incoming.[channel].apicurio.registry.url=http://<your_host>:<your_port>/apis/registry/v2
mp.messaging.incoming.[channel].apicurio.registry.use-specific-avro-reader=true
Producer
| Property | Recommended value |
|---|---|
value.serializer |
io.apicurio.registry.serde.avro.AvroKafkaSerializer |
apicurio.registry.url |
To automatically register schemas with the registry, add:
| Property | Value |
|---|---|
apicurio.registry.auto-register |
true |
Example:
mp.messaging.outgoing.[channel].value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer
mp.messaging.outgoing.[channel].apicurio.registry.url=http://<your_host>:<your_port>/apis/registry/v2
mp.messaging.outgoing.[channel].apicurio.registry.auto-register=true
Health reporting
The Kafka connector reports the readiness and liveness of each channel managed by the connector.
To disable health reporting, set the health-enabled attribute for the channel to false.
|
Readiness
On the inbound side, two strategies are available to check the readiness of the application. The default strategy verifies that we have at least one active connection with the broker. This strategy is lightweight.
You can also enable another strategy by setting the health-readiness-topic-verification attribute to true.
In this case, the check verifies that:
-
the broker is available
-
the Kafka topic is created (available in the broker).
-
no failures have been caught
With this second strategy, if you consume multiple topics using the topics attribute, the readiness check verifies that all the consumed topics are available.
If you use a pattern (using the pattern attribute), the readiness check verifies that at least one existing topic matches the pattern.
On the outbound side (writing records to Kafka), two strategies are also offered. The default strategy just verifies that the producer has at least one active connection with the broker.
You can also enable another strategy by setting the health-readiness-topic-verification attribute to true.
In this case, teh check verifies that
-
the broker is available
-
the Kafka topic is created (available in the broker).
With this second strategy, the readiness check uses a Kafka Admin Client to retrieve the existing topics.
Retrieving the topics can be a lengthy operation.
You can configure a timeout using the health-readiness-timeout attribute.
The default timeout is set to 2 seconds.
Also, you can disable the readiness checks altogether by setting health-readiness-enabled to false.
Liveness
On the inbound side (receiving records from Kafka), the liveness check verifies that:
-
no failures have been caught
-
the client is connected to the broker
On the outbound side (writing records to Kafka), the liveness check verifies that:
-
no failures have been caught
Note that a message processing failures nacks the message which is then handled by the failure-strategy.
It the responsibility of the failure-strategy to report the failure and influence the outcome of the liveness checks.
The fail failure strategy reports the failure and so the liveness check will report the failure.
Consumer Rebalance Listener
To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener.
To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface, make the implementing class a bean, and add the @Identifier qualifier.
A usual use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.
The listener is invoked every time the consumer topic/partition assignment changes.
For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer.
If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.
Note that the rebalance listener methods are called from the Kafka polling thread and must block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.
When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and restarts once the rebalance completes.
If the rebalance listener handles offset commit on behalf of the user (using the ignore commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback.
We also recommend applying the same logic when the application stops.
Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.
Example
In this example we set-up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide
a bean that implements the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and is annotated with
@Identifier. We then must configure our inbound connector to use this named bean.
package inbound;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import javax.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {
private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());
/**
* When receiving a list of partitions will search for the earliest offset within 10 minutes
* and seek the consumer to it.
*
* @param consumer underlying consumer
* @param partitions set of assigned topic partitions
*/
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions) {
long now = System.currentTimeMillis();
long shouldStartAt = now - 600_000L; //10 minute ago
Map<org.apache.kafka.common.TopicPartition, Long> request = new HashMap<>();
for (org.apache.kafka.common.TopicPartition partition : partitions) {
LOGGER.info("Assigned " + partition);
request.put(partition, shouldStartAt);
}
Map<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> offsets = consumer
.offsetsForTimes(request);
for (Map.Entry<org.apache.kafka.common.TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
long target = position.getValue() == null ? 0L : position.getValue().offset();
LOGGER.info("Seeking position " + target + " for " + position.getKey());
consumer.seek(position.getKey(), target);
}
}
}
package inbound;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class KafkaRebalancedConsumer {
@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
// We don't need to ACK messages because in this example we set offset during consumer re-balance
return CompletableFuture.completedFuture(null);
}
}
To configure the inbound connector to use the provided listener we either set the consumer rebalance listener’s name:
-
mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer
Or have the listener’s name be the same as the group id:
-
mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer
Setting the consumer rebalance listener’s name takes precedence over using the group id.
KafkaClientService
For advanced use cases, SmallRye Reactive Messaging provides a bean of type KafkaClientService that you can inject:
@Inject
KafkaClientService kafka;
From there, you can obtain an io.smallrye.reactive.messaging.kafka.KafkaProducer and an io.smallrye.reactive.messaging.kafka.KafkaConsumer.
KafkaProducer and KafkaConsumer expose a non-blocking API on top of the Kafka client API.
They also mediate access to the threads that SmallRye Reactive Messaging uses to run all Kafka operations: the polling thread, used for consuming records from Kafka topics, and the sending thread, used for producing records to Kafka topics.
(Just to be clear: each channel has its own polling thread and sending thread.)
The reason why SmallRye Reactive Messaging uses a special thread to run the poll loop should be obvious: the Consumer API is blocking.
The Producer API, on the other hand, is documented to be non-blocking.
However, in present versions, Kafka doesn’t guarantee that in all cases; see KAFKA-3539 for more details.
That is why SmallRye Reactive Messaging uses a dedicated thread to run the send operations as well.
Sometimes, SmallRye Reactive Messaging provides direct access to the Kafka Producer or Consumer.
For example, a KafkaConsumerRebalanceListener methods are always invoked on the polling thread, so they give you direct access to Consumer.
In such case, you should use the Producer/Consumer API directly, instead of the KafkaProducer/KafkaConsumer API.
Kerberos authentication
When using Kerberos authentication, you need to configure the connector with:
-
the security protocol set to
SASL_PLAINTEXT -
the SASL mechanism set to
GSSAPI -
the Jaas config configured with
Krb5LoginModule -
the Kerberos service name
The following snippet provides an example:
kafka.bootstrap.servers=ip-192-168-0-207.us-east-2.compute.internal:9094
kafka.sasl.mechanism=GSSAPI
kafka.security.protocol=SASL_PLAINTEXT
kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true refreshKrb5Config=true useKeyTab=true storeKey=true keyTab="file:/opt/kafka/krb5/kafka-producer.keytab" principal="kafka-producer/ip-192-168-0-207.us-east-2.compute.internal@INTERNAL";
kafka.sasl.kerberos.service.name=kafka