You can test event consumers without a running Kafka broker by using an embedded Kafka broker.
This is how you can spin up an embedded Kafka broker and a Zookeeper instance within your test environment.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@Testcontainers
public class KafkaConsumerTest {
@Container
private static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
private KafkaConsumer<String, String> consumer;
@BeforeEach
void setUp() {
kafka.start();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
@AfterEach
void tearDown() {
if (consumer != null) {
consumer.close();
}
if (kafka != null && kafka.isRunning()) {
kafka.stop();
}
}
@Test
void testConsumeMessage() {
// In a real test, you would produce a message to Kafka here
// and then assert that the consumer receives it.
// For demonstration, we'll just check if the consumer can be created.
assertNotNull(consumer, "Kafka consumer should be initialized");
// Example of subscribing and polling (would require a producer to send data)
// consumer.subscribe(Collections.singletonList("test-topic"));
// var records = consumer.poll(Duration.ofSeconds(1));
// assertFalse(records.isEmpty(), "Should receive records");
}
}
This setup uses Testcontainers to spin up a Kafka broker in a Docker container for your tests. The KafkaContainer class automatically manages the lifecycle of the Kafka instance, starting it before your tests run and stopping it afterward. Your consumer is then configured to connect to this ephemeral broker using kafka.getBootstrapServers().
The core problem this solves is dependency on an external, running Kafka cluster for unit and integration testing of Kafka consumers. By embedding Kafka, you achieve true isolation, making your tests faster, more reliable, and repeatable. You can control the broker’s configuration, simulate specific scenarios (like network partitions or broker failures, though this example is simpler), and ensure your consumer logic is sound without the overhead and potential flakiness of external dependencies.
The KafkaContainer itself is built on top of Docker, so you need Docker installed and running on your machine for this to work. The DockerImageName.parse("confluentinc/cp-kafka:7.3.0") specifies the exact Kafka image and version to pull from Docker Hub. The kafka.getBootstrapServers() method returns the connection string (e.g., PLAINTEXT://localhost:56789) to the dynamically assigned port for the embedded broker.
When you configure your KafkaConsumer, you point it to kafka.getBootstrapServers(). This makes your consumer believe it’s talking to a real Kafka cluster, but it’s actually interacting with the in-memory or containerized instance managed by Testcontainers. The BOOTSTRAP_SERVERS_CONFIG is the key property here.
The GROUP_ID_CONFIG is standard Kafka consumer configuration. AUTO_OFFSET_RESET_CONFIG set to earliest ensures that if there’s any data in a topic you subscribe to, your consumer will start reading from the beginning. The deserializers (KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG) are set to StringDeserializer because we’re assuming string keys and values, but you’d adjust these based on your actual data format.
To actually test message consumption, you’d typically need a producer to send messages to the embedded Kafka broker before or during your test. You could use KafkaProducer configured with kafka.getBootstrapServers(), or even another KafkaContainer instance configured as a producer. Then, your consumer would subscribe to the topic and poll for messages. The commented-out lines in testConsumeMessage show how you’d do this: subscribe to a topic, poll for records, and then assert that you received what you expected.
A common pitfall is forgetting to start the KafkaContainer. The @Container annotation and Testcontainers’ lifecycle management usually handle this, but explicit kafka.start() in @BeforeEach is a good safeguard. Similarly, ensuring consumer.close() and kafka.stop() in @AfterEach or @AfterAll prevents resource leaks.
The real power comes when you combine this with a mocked producer or a simple producer within the test to feed data. You can then assert that your consumer’s processing logic correctly handles incoming messages, transforms them, or updates some state. This allows for robust integration testing of your consumer application without any external Kafka infrastructure.
If your consumer relies on external services or databases, you’ll likely want to mock those as well to keep your tests focused solely on the Kafka interaction and the consumer’s core logic.
The next challenge you’ll face is testing scenarios where the Kafka broker behaves unexpectedly, like message duplication or ordering guarantees.