Skip to content

Writing Kafka Records

The Kafka Connector can write Reactive Messaging Messages as Kafka Records.

Example

Let’s imagine you have a Kafka broker running, and accessible using the kafka:9092 address (by default it would use localhost:9092). Configure your application to write the messages from the prices channel into a Kafka topic as follows:

1
2
3
4
5
kafka.bootstrap.servers=kafka:9092 # <1>

mp.messaging.outgoing.prices-out.connector=smallrye-kafka # <2>
mp.messaging.outgoing.prices-out.value.serializer=org.apache.kafka.common.serialization.DoubleSerializer # <3>
mp.messaging.outgoing.prices-out.topic=prices # <4>
1. Configure the broker location. You can configure it globally or per channel 2. Configure the connector to manage the prices channel 3. Sets the (Kafka) serializer to encode the message payload into the record’s value 4. Make sure the topic name is prices (and not the default prices-out)

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package kafka.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package kafka.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class KafkaPriceMessageProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

The serialization is handled by the underlying Kafka Client. You need to configure the:

  • mp.messaging.outgoing.[channel-name].value.serializer to configure the value serializer (mandatory)

  • mp.messaging.outgoing.[channel-name].key.serializer to configure the key serializer (optional, default to String)

If you want to use a custom serializer, add it to your CLASSPATH and configure the associate attribute.

By default, the written record contains:

  • the Message payload as value

  • no key, or the key configured using the key attribute or the key passed in the metadata attached to the Message

  • the timestamp computed for the system clock (now) or the timestamp passed in the metadata attached to the Message

Sending key/value pairs

In the Kafka world, it’s often necessary to send records, i.e. a key/value pair. The connector provides the io.smallrye.reactive.messaging.kafka.Record class that you can use to send a pair:

1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public Record<String, String> process(String in) {
    return Record.of("my-key", in);
}

When the connector receives a message with a Record payload, it extracts the key and value from it. The configured serializers for the key and the value must be compatible with the record’s key and value. Note that the key and the value can be null. It is also possible to create a record with a null key AND a null value.

If you need more control on the written records, use OutgoingKafkaRecordMetadata.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingKafkaRecordMetadata to influence how the message is going to be written to Kafka. For example, you can add Kafka headers, configure the record key…

// Creates an OutgoingKafkaRecordMetadata
// The type parameter is the type of the record's key
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String> builder()
        .withKey("my-key")
        .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
        .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Propagating Record Key

When processing messages, you can propagate incoming record key to the outgoing record.

Consider the following example method, which consumes messages from the channel in, transforms the payload, and writes the result to the channel out.

1
2
3
4
5
@Incoming("in")
@Outgoing("out")
public double process(int in) {
    return in * 0.88;
}

Enabled with mp.messaging.outgoing.[channel-name].propagate-record-key=true configuration, record key propagation produces the outgoing record with the same key as the incoming record.

If the outgoing record already contains a key, it won’t be overridden by the incoming record key. If the incoming record does have a null key, the mp.messaging.outgoing.[channel-name].key property is used.

Propagating Record headers

You can also propagate incoming record headers to the outgoing record, by specifying the list of headers to be considered.

mp.messaging.outgoing.[channel-name].propagate-headers=Authorization,Proxy-Authorization

If the ougoing record already defines a header with the same key, it won't be overriden by the incoming header.

Dynamic topic names

Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the topic inside your application configuration file, but instead, use the outbound metadata to set the name of the topic.

For example, you can route to a dynamic topic based on the incoming message:

1
2
3
4
5
6
7
8
String topicName = selectTopicFromIncommingMessage(incoming);
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String> builder()
        .withTopic(topicName)
        .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Acknowledgement

Kafka acknowledgement can take times depending on the configuration. Also, it stores in-memory the records that cannot be written.

By default, the connector does wait for Kafka to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

Note that the acks attribute has a huge impact on the record acknowledgement.

If a record cannot be written, the message is nacked.

Back-pressure and inflight records

The Kafka outbound connector handles back-pressure monitoring the number of in-flight messages waiting to be written to the Kafka broker. The number of in-flight messages is configured using the max-inflight-messages attribute and defaults to 1024.

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one in-flight message gets acknowledged by the broker. Then, the connector writes a new message to Kafka when one of the broker’s in-flight messages get acknowledged. Be sure to configure Kafka’s batch.size and linger.ms accordingly.

You can also remove the limit of inflight messages by setting max-inflight-messages to 0. However, note that the Kafka Producer may block if the number of requests reaches max.in.flight.requests.per.connection.

Handling serialization failures

For Kafka producer client serialization failures are not recoverable, thus the message dispatch is not retried. However, using schema registries, serialization may intermittently fail to contact the registry. In these cases you may need to apply a failure strategy for the serializer. To achieve this, create a CDI bean implementing the SerializationFailureHandler interface:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic,
                boolean isKey, String serializer,
                Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

The bean must be exposed with the @Identifier qualifier specifying the name of the bean. Then, in the connector configuration, specify the following attribute:

  • mp.messaging.incoming.$channel.key-serialization-failure-handler: name of the bean handling serialization failures happening for the record’s key

  • mp.messaging.incoming.$channel.value-serialization-failure-handler: name of the bean handling serialization failures happening for the record’s value,

The handler is called with the serialization action as a Uni, the record’s topic, a boolean indicating whether the failure happened on a key, the class name of the deserializer that throws the exception, the corrupted data, the exception, and the records headers. Failure strategies like retry, providing a fallback value or applying timeout can be implemented. Note that the method must await on the result and return the serialized byte array. Alternatively, the handler can implement decorateSerialization method which can return a fallback value.

Sending Cloud Events

The Kafka connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:

  • the message metadata contains an io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instance,

  • the channel configuration defines the cloud-events-type and cloud-events-source attributes.

You can create io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instances using:

package kafka.outbound;

import java.net.URI;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;

@ApplicationScoped
public class KafkaCloudEventProcessor {

    @Outgoing("cloud-events")
    public Message<String> toCloudEvents(Message<String> in) {
        return in.addMetadata(OutgoingCloudEventMetadata.builder()
                .withId("id-" + in.getPayload())
                .withType("greetings")
                .withSource(URI.create("http://example.com"))
                .withSubject("greeting-message")
                .build());
    }

}

If the metadata does not contain an id, the connector generates one (random UUID). The type and source can be configured per message or at the channel level using the cloud-events-type and cloud-events-source attributes. Other attributes are also configurable.

The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:

package kafka.outbound;

import java.net.URI;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;

@ApplicationScoped
public class KafkaCloudEventMultipleProcessors {

    @Incoming("source")
    @Outgoing("processed")
    public Message<String> process(Message<String> in) {
        return in.addMetadata(OutgoingCloudEventMetadata.builder()
                .withId("id-" + in.getPayload())
                .withType("greeting")
                .build());
    }

    @SuppressWarnings("unchecked")
    @Incoming("processed")
    @Outgoing("cloud-events")
    public Message<String> process2(Message<String> in) {
        OutgoingCloudEventMetadata<String> metadata = in
                .getMetadata(OutgoingCloudEventMetadata.class)
                .orElseGet(() -> OutgoingCloudEventMetadata.builder().build());

        return in.addMetadata(OutgoingCloudEventMetadata.from(metadata)
                .withSource(URI.create("source://me"))
                .withSubject("test")
                .build());
    }

}

By default, the connector sends the Cloud Events using the binary format. You can write structured Cloud Events by setting the cloud-events-mode to structured. Only JSON is supported, so the created records had its content-type header set to application/cloudevents+json; charset=UTF-8 When using the structured mode, the value serializer must be set to org.apache.kafka.common.serialization.StringSerializer, otherwise the connector reports the error. In addition, in structured, the connector maps the message’s payload to JSON, except for String passed directly.

The record’s key can be set in the channel configuration (key attribute), in the OutgoingKafkaRecordMetadata or using the partitionkey Cloud Event attribute.

Note

you can disable the Cloud Event support by setting the cloud-events attribute to false

Using ProducerRecord

Kafka built-in type ProducerRecord\ can also be used for producing messages:

1
2
3
4
@Outgoing("out")
public ProducerRecord<String, String> generate() {
    return new ProducerRecord<>("my-topic", "key", "value");
}

Warning

This is an advanced feature. The ProducerRecord is sent to Kafka as is. Any possible metadata attached through Message<ProducerRecord<K, V>> are ignored and lost.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
acks The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all string false 1
bootstrap.servers (kafka.bootstrap.servers) A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster. string false localhost:9092
buffer.memory The total bytes of memory the producer can use to buffer records waiting to be sent to the server. long false 33554432
close-timeout The amount of milliseconds waiting for a graceful shutdown of the Kafka producer int false 10000
cloud-events Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. boolean false true
cloud-events-data-content-type (cloud-events-default-data-content-type) Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself string false
cloud-events-data-schema (cloud-events-default-data-schema) Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself string false
cloud-events-insert-timestamp (cloud-events-default-timestamp) Whether or not the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself boolean false true
cloud-events-mode The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record string false binary
cloud-events-source (cloud-events-default-source) Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself string false
cloud-events-subject (cloud-events-default-subject) Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself string false
cloud-events-type (cloud-events-default-type) Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself string false
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
health-readiness-enabled Whether readiness health reporting is enabled (default) or disabled boolean false true
health-readiness-timeout deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead. long false
health-readiness-topic-verification deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead. boolean false
health-topic-verification-enabled Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection. boolean false false
health-topic-verification-timeout During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. long false 2000
kafka-configuration Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. string false
key A key to used when writing the record string false
key-serialization-failure-handler The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value. string false
key.serializer The serializer classname used to serialize the record's key string false org.apache.kafka.common.serialization.StringSerializer
max-inflight-messages The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to 0 remove the limit long false 1024
merge Whether the connector should allow multiple upstreams boolean false false
partition The target partition id. -1 to let the client determine the partition int false -1
propagate-headers A comma-separating list of incoming record headers to be propagated to the outgoing record string false ``
propagate-record-key Propagate incoming record key to the outgoing record boolean false false
retries If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by delivery.timeout.ms. long false 2147483647
topic The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used string false
tracing-enabled Whether tracing is enabled (default) or disabled boolean false true
value-serialization-failure-handler The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value. string false
value.serializer The serializer classname used to serialize the payload string true
waitForWriteCompletion Whether the client waits for Kafka to acknowledge the written record before acknowledging the message boolean false true

You can also pass any property supported by the underlying Kafka producer.

For example, to configure the batch.size property, use:

mp.messaging.outgoing.[channel].batch.size=32768

Some producer client properties are configured to sensible default values:

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

If not set, key.serializer is set to org.apache.kafka.common.serialization.StringSerializer.

If not set, producer client.id is generated as kafka-producer-[channel].