Skip to content

Kafka Companion

Experimental

Kafka Companion is experimental and APIs are subject to change in the future.

The Kafka Companion is a separate Java library for helping to test Kafka applications. It is not intended to mock Kafka, but to the contrary, connect to a Kafka broker and provide high-level features.

It is not limited to the SmallRye Reactive Messaging testing, but intends to improve the testability of applications using Kafka. Some of its features:

  • Running In-container Kafka broker for tests via strimzi-test-container.
  • Running the Kafka broker behind a toxiproxy for simulating network issues.
  • Running embedded Kafka Kraft broker for tests.
  • Base classes for tests to bootstrap tests.
  • Companion classes for easily creating tasks to produce and consume Kafka records.
  • Writing assertions for produce and consume tasks, state of consumers, topics, offsets etc.

Getting started writing tests

Easiest way of starting to write Kafka tests is to extend KafkaCompanionTestBase. It starts a single-node Kafka broker for the test suite using testcontainers and creates the KafkaCompanion to connect to this broker:

public class KafkaWithBaseTest extends KafkaCompanionTestBase {

    @Test
    public void testWithBase() {
        // companion is created by the base class

        // produce 100 records to messages topic
        ProducerTask producerTask = companion.produceIntegers()
                .usingGenerator(i -> new ProducerRecord<>("messages", i), 100);
        long msgCount = producerTask.awaitCompletion().count();
        Assertions.assertEquals(msgCount, 100);

        // consume 100 records from messages topic
        ConsumerTask<String, Integer> consumerTask = companion.consumeIntegers()
                .fromTopics("messages", 100);
        ConsumerRecord<String, Integer> lastRecord = consumerTask.awaitCompletion().getLastRecord();
        Assertions.assertEquals(lastRecord.value(), 99);
    }
}

KafkaCompanion can be used on its own to connect to a broker:

// Create companion with bootstrap servers and API timeout (default is 10 seconds)
KafkaCompanion companion = new KafkaCompanion("localhost:9092", Duration.ofSeconds(5));

// Create producer and start producer task
ProducerBuilder<String, Integer> producer = companion.produceIntegers()
        .withClientId("my-producer")
        .withProp("max.block.ms", "5000");
producer.usingGenerator(i -> new ProducerRecord<>("topic", i), 100);

// Create consumer and start consumer task
ConsumerBuilder<String, Integer> consumer = companion.consumeIntegers()
        .withGroupId("my-group")
        .withCommitAsyncWhen(record -> true);
ConsumerTask<String, Integer> records = consumer.fromTopics("topic", Duration.ofSeconds(10));
// Await completion and assert consumed record count
Assertions.assertEquals(records.awaitCompletion().count(), 100);

There are a couple of things to note on how Kafka companion handles producers, consumers and tasks:

ProducerBuilder and ConsumerBuilder lazy descriptions of with which configuration to create a producer or a consumer.

ProducerTask and ConsumerTask run asynchronously on dedicated threads and are started as soon as they are created. A task terminates when it is explicitly stopped, when it's predefined duration or number of records has been produced/consumed, or when it encounters an error. An exterior thread can await on records processed, or simply on termination of the task. At a given time produced or consumed records are accessible through the task.

The actual creation of the producer or consumer happens when a task is started. When the task terminates the producer or the consumer is automatically closed.

For example, in the previous example:

  1. We described a producer with a client id my-producer and max.block.ms of 5 seconds.
  2. Then we created a task to produce 100 records using the generator function, without waiting for its completion.
  3. We then described a consumer with group id my-group and which commits offset synchronously on every received record.
  4. Finally, we created a task to consume records for 10 seconds and await its completion.

Producing records

Produce from records

Produce given records:

1
2
3
4
companion.produce(byte[].class).fromRecords(
        new ProducerRecord<>("topic1", "k1", "1".getBytes()),
        new ProducerRecord<>("topic1", "k2", "2".getBytes()),
        new ProducerRecord<>("topic1", "k3", "3".getBytes()));

Produce from generator function

Produce 10 records using the generator function:

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("topic", i), 10);

Produce from CSV file

Given a comma-separated file records.csv with the following content

1
2
3
messages,0,a,asdf
messages,1,b,asdf
messages,3,c,asdf

Produce records from the file:

companion.produceStrings().fromCsv("records.csv");

Consuming records

Consume from topics

companion.consumeIntegers().fromTopics("topic1", "topic2");

Consume from offsets

1
2
3
4
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic1", 0), 100L);
offsets.put(new TopicPartition("topic2", 0), 100L);
companion.consumeIntegers().fromOffsets(offsets, Duration.ofSeconds(10));

Consumer assignment and offsets

During execution of the consumer task, information about the underlying consumer's topic partition assignment, position or committed offsets can be accessed.

1
2
3
4
5
6
ConsumerBuilder<String, Integer> consumer = companion.consumeIntegers()
        .withAutoCommit();
consumer.fromTopics("topic");
// ...
await().untilAsserted(consumer::waitForAssignment);
consumer.committed(new TopicPartition("topic", 1));

Registering Custom Serdes

KafkaCompanion handles Serializers and Deserializers for default types such as primitives, String, ByteBuffer, UUID.

Serdes for custom types can be registered to the companion object, and will be used for producer and consumer tasks:

KafkaCompanion companion = new KafkaCompanion("localhost:9092");

// Register serde to the companion
companion.registerSerde(Orders.class, new OrdersSerializer(), new OrdersDeserializer());

// Companion will configure consumer accordingly
ConsumerTask<Integer, Orders> orders = companion.consume(Integer.class, Orders.class)
        .fromTopics("orders", 1000).awaitCompletion();

for (ConsumerRecord<Integer, Orders> order : orders) {
    // ... check consumed records
}

Topics

Create, list, describe and delete topics:

1
2
3
4
5
companion.topics().create("topic1", 1);
companion.topics().createAndWait("topic2", 3);
Assertions.assertEquals(companion.topics().list().size(), 2);

companion.topics().delete("topic1");

Consumer Groups and Offsets

List topic partition offsets:

TopicPartition topicPartition = KafkaCompanion.tp("topic", 1);
long latestOffset = companion.offsets().get(topicPartition, OffsetSpec.latest()).offset();

List, describe, alter consumer groups and their offsets:

1
2
3
4
5
6
7
Collection<ConsumerGroupListing> consumerGroups = companion.consumerGroups().list();
for (ConsumerGroupListing consumerGroup : consumerGroups) {
    // check consumer groups
}

TopicPartition topicPartition = KafkaCompanion.tp("topic", 1);
companion.consumerGroups().resetOffsets("consumer-group", topicPartition);