Kafka Companion is experimental and APIs are subject to change in the future.
The Kafka Companion is a separate Java library for helping to test Kafka applications.
It is not intended to mock Kafka, but to the contrary, connect to a Kafka broker and provide high-level features.
It is not limited to the SmallRye Reactive Messaging testing, but intends to improve the testability of applications using Kafka. Some of its features:
Running the Kafka broker behind a toxiproxy for simulating network issues.
Running embedded Kafka Kraft broker for tests.
Base classes for tests to bootstrap tests.
Companion classes for easily creating tasks to produce and consume Kafka records.
Writing assertions for produce and consume tasks, state of consumers, topics, offsets etc.
Getting started writing tests
Easiest way of starting to write Kafka tests is to extend KafkaCompanionTestBase.
It starts a single-node Kafka broker for the test suite using testcontainers and creates the KafkaCompanion to connect to this broker:
publicclassKafkaWithBaseTestextendsKafkaCompanionTestBase{@TestpublicvoidtestWithBase(){// companion is created by the base class// produce 100 records to messages topicProducerTaskproducerTask=companion.produceIntegers().usingGenerator(i->newProducerRecord<>("messages",i),100);longmsgCount=producerTask.awaitCompletion().count();Assertions.assertEquals(msgCount,100);// consume 100 records from messages topicConsumerTask<String,Integer>consumerTask=companion.consumeIntegers().fromTopics("messages",100);ConsumerRecord<String,Integer>lastRecord=consumerTask.awaitCompletion().getLastRecord();Assertions.assertEquals(lastRecord.value(),99);}}
KafkaCompanion can be used on its own to connect to a broker:
// Create companion with bootstrap servers and API timeout (default is 10 seconds)KafkaCompanioncompanion=newKafkaCompanion("localhost:9092",Duration.ofSeconds(5));// Create producer and start producer taskProducerBuilder<String,Integer>producer=companion.produceIntegers().withClientId("my-producer").withProp("max.block.ms","5000");producer.usingGenerator(i->newProducerRecord<>("topic",i),100);// Create consumer and start consumer taskConsumerBuilder<String,Integer>consumer=companion.consumeIntegers().withGroupId("my-group").withCommitAsyncWhen(record->true);ConsumerTask<String,Integer>records=consumer.fromTopics("topic",Duration.ofSeconds(10));// Await completion and assert consumed record countAssertions.assertEquals(records.awaitCompletion().count(),100);
There are a couple of things to note on how Kafka companion handles producers, consumers and tasks:
ProducerBuilder and ConsumerBuilder lazy descriptions of with which configuration to create a producer or a consumer.
ProducerTask and ConsumerTask run asynchronously on dedicated threads and are started as soon as they are created.
A task terminates when it is explicitly stopped, when it's predefined duration or number of records has been produced/consumed, or when it encounters an error.
An exterior thread can await on records processed, or simply on termination of the task.
At a given time produced or consumed records are accessible through the task.
The actual creation of the producer or consumer happens when a task is started. When the task terminates the producer or the consumer is automatically closed.
For example, in the previous example:
We described a producer with a client id my-producer and max.block.ms of 5 seconds.
Then we created a task to produce 100 records using the generator function, without waiting for its completion.
We then described a consumer with group id my-group and which commits offset synchronously on every received record.
Finally, we created a task to consume records for 10 seconds and await its completion.
During execution of the consumer task, information about the underlying consumer's topic partition assignment, position or committed offsets can be accessed.
KafkaCompanioncompanion=newKafkaCompanion("localhost:9092");// Register serde to the companioncompanion.registerSerde(Orders.class,newOrdersSerializer(),newOrdersDeserializer());// Companion will configure consumer accordinglyConsumerTask<Integer,Orders>orders=companion.consume(Integer.class,Orders.class).fromTopics("orders",1000).awaitCompletion();for(ConsumerRecord<Integer,Orders>order:orders){// ... check consumed records}