Apache Kafka

The Kafka connector adds support for Kafka to Reactive Messaging. With it you can receive Kafka Records as well as write message into Kafka.

The Kafka Connector is based on the Vert.x Kafka Client.

Introduction

Apache Kafka is a popular distributed streaming platform. It lets you:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp.

For more details about Kafka, check the documentation.

Using the Kafka Connector

To use the Kafka Connector, add the following dependency to your project:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-kafka</artifactId>
  <version>2.0.3</version>
</dependency>

The connector name is: smallrye-kafka.

So, to indicate that a channel is managed by this connector you need:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-kafka

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-kafka

Receiving Kafka Records

The Kafka Connector retrieves Kafka Records from Kafka Brokers and maps each of them to Reactive Messaging Messages.

Example

Let’s imagine you have a Kafka broker running, and accessible using the kafka:9092 address (by default it would use localhost:9092). Configure your application to receive Kafka records from a Kafka topic on the prices channel as follows:

kafka.bootstrap.servers=kafka:9092      (1)

mp.messaging.incoming.prices.connector=smallrye-kafka       (2)
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer    (3)
  1. Configure the broker location. You can configure it globally or per channel

  2. Configure the connector to manage the prices channel

  3. Sets the (Kafka) deserializer to read the record’s value

You don’t need to set the Kafka topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it.

Then, your application receives Message<Double>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class KafkaPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Or, you can retrieve the Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message (commit the offset)
        return price.ack();
    }

}

Deserialization

The deserialization is handled by the underlying Kafka Client. You need to configure the:

  • mp.messaging.incoming.[channel-name].value.deserializer to configure the value deserializer (mandatory)

  • mp.messaging.incoming.[channel-name].key.deserializer to configure the key deserializer (optional, default to String)

If you want to use a custom deserializer, add it to your CLASSPATH and configure the associate attribute.

Inbound Metadata

Messages coming from Kafka contains an instance of IncomingKafkaRecordMetadata<K, T> in the metadata. K is the type of the record’s key. T is the type of the record’s value. It provides the key, topic, partitions, headers and so on:

package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.time.Instant;
import java.util.Optional;

public class KafkaMetadataExample {


    public void metadata() {
        Message<Double> incoming = Message.of(12.0);
        // tag::code[]
        Optional<IncomingKafkaRecordMetadata<String, Double>> metadata = incoming.getMetadata(IncomingKafkaRecordMetadata.class);
        metadata.ifPresent(meta -> {
            // The topic
            String topic = meta.getTopic();

            // The key
            String key = meta.getKey();

            // The timestamp
            Instant timestamp = meta.getTimestamp();

            // The underlying record
            KafkaConsumerRecord<String, Double> record = meta.getRecord();

            // ...
        });
        // end::code[]
    }

}

Acknowledgement

Acknowledging a message coming from Kafka commit the offset. Refer to the Kafka documentation for details.

Configuration Reference

Table 1. Incoming Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

topic

The consumed / populated Kafka topic. If not set, the channel name is used

Type: string

false

key.deserializer

The deserializer classname used to deserialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

The deserializer classname used to deserialize the record’s value

Type: string

true

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.

Type: int

false

1

group.id

A unique string that identifies the consumer group the application belongs to. If not set, a unique, generated id is used

Type: string

false

retry

Whether or not the connection to the broker is re-attempted in case of failure

Type: boolean

false

true

retry-attempts

The maximum number of reconnection before failing. -1 means infinite retry

Type: int

false

-1

retry-max-wait

The max delay (in seconds) between 2 reconnects

Type: int

false

30

broadcast

Whether the Kafka records should be dispatched to multiple consumer

Type: boolean

false

false

auto.offset.reset

What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none

Type: string

false

latest

You can also pass any property supported by the Vert.x Kafka client as attribute.

Writing Kafka Records

The Kafka Connector can write Reactive Messaging Messages as Kafka Records.

Example

Let’s imagine you have a Kafka broker running, and accessible using the kafka:9092 address (by default it would use localhost:9092). Configure your application to write the messages from the prices channel into a Kafka topic as follows:

kafka.bootstrap.servers=kafka:9092      (1)

mp.messaging.outgoing.prices.connector=smallrye-kafka   (2)
mp.messaging.outgoing.prices.value.serializer=org.apache.kafka.common.serialization.DoubleSerializer  (3)
  1. Configure the broker location. You can configure it globally or per channel

  2. Configure the connector to manage the prices channel

  3. Sets the (Kafka) serializer to encode the message payload into the record’s value

You don’t need to set the Kafka topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

The serialization is handled by the underlying Kafka Client. You need to configure the:

  • mp.messaging.outgoing.[channel-name].value.serializer to configure the value serializer (mandatory)

  • mp.messaging.outgoing.[channel-name].key.serializer to configure the key serializer (optional, default to String)

If you want to use a custom serializer, add it to your CLASSPATH and configure the associate attribute.

By default, the written record contains:

  • the Message payload as value

  • no key, or the key configured using the key attribute or the key passed in the metadata attached to the Message

  • the timestamp computed for the system clock (now) or the timestamp passed in the metadata attached to the Message

Outbound Metadata

When sending Messages, you can add an instance of OutgoingKafkaRecordMetadata to influence how the message is going to written to Kafka. For example, you can configure the topic directly in the message, or add Kafka headers, configure the record key…​

        // Creates an OutgoingKafkaRecordMetadata
        // The type parameter is the type of the record's key
        OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
            .withKey("my-key")
            .withTopic("my-custom-topic")
            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
            .build();

        // Create a new message from the `incoming` message
        // Add `metadata` to the metadata from the `incoming` message.
        return incoming.addMetadata(metadata);

Acknowledgement

Kafka acknowledgement can take times depending on the configuration. Also it stores in-memory the records that cannot be written.

By default, the connector does not wait for Kafka to acknowledge the record and continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

Note the the acks attribute has a huge impact on the record acknowledgement.

Configuration Reference

Table 2. Outgoing Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all

Type: string

false

1

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

buffer.memory

The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

Type: long

false

33554432

key

A key to used when writing the record

Type: string

false

key.serializer

The serializer classname used to serialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringSerializer

max-inflight-messages

The maximum number of messages to be written to Kafka concurrently - The default value is the value from the max.in.flight.requests.per.connection Kafka property. It configures the maximum number of unacknowledged requests the client before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries.

Type: int

false

5

partition

The target partition id. -1 to let the client determine the partition

Type: int

false

-1

retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.

Type: long

false

2147483647

topic

The consumed / populated Kafka topic. If not set, the channel name is used

Type: string

false

value.serializer

The serializer classname used to serialize the payload

Type: string

true

waitForWriteCompletion

Whether the client waits for Kafka to acknowledge the written record before acknowledging the message

Type: boolean

false

true

You can also pass any property supported by the Vert.x Kafka client.

Retrieving Kafka default configuration

If your application/runtime exposes as a CDI bean a Map<String, Object named default-kafka-broker, this configuration is used to establish the connection with the Kafka broker:

For example, you can imagine exposing this map as follows:

@Produces
@ApplicationScoped
@Named("default-kafka-broker")
public Map<String, Object> createKafkaRuntimeConfig() {
    Map<String, Object> properties = new HashMap<>();

    StreamSupport
        .stream(config.getPropertyNames().spliterator(), false)
        .map(String::toLowerCase)
        .filter(name -> name.startsWith("kafka"))
        .distinct()
        .sorted()
        .forEach(name -> {
            final String key = name.substring("kafka".length() + 1).toLowerCase().replaceAll("[^a-z0-9.]", ".");
            final String value = config.getOptionalValue(name, String.class).orElse("");
            properties.put(key, value);
        });

    return properties;
}

This previous example would extract all the configuration keys from MicroProfile Config starting with kafka.

Quarkus
Starting Quarkus 1.5, this map is automatically exposed.