Event-driven systems can gracefully handle situations where producers generate events faster than consumers can process them, preventing data loss by employing backpressure mechanisms.
Imagine a busy restaurant kitchen. The order tickets (events) are coming in faster than the chefs (consumers) can cook them. If the kitchen has no way to signal this overload, orders will pile up, get lost, or the chefs will get overwhelmed and start making mistakes. Backpressure is the system’s way of the chefs telling the waitstaff (producers) to slow down.
Let’s see this in action with a simple Kafka producer and consumer.
Producer Side (Simulated):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Crucially, set a buffer size. This is where events wait *before* going to Kafka brokers.
props.put("buffer.memory", 33554432); // 32MB
// The batch size also influences how many events are grouped before sending.
props.put("batch.size", 16384); // 16KB
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000000; i++) {
try {
// This send is asynchronous. It returns a RecordMetadata future.
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "message-" + i), (metadata, exception) -> {
if (exception == null) {
// Successfully sent to Kafka broker
} else {
// Handle send error
}
});
// If buffer.memory is full, this send call will block until space is available or timeout occurs.
// This is a form of backpressure.
} catch (Exception e) {
// Handle immediate exceptions during send initiation
}
}
producer.close();
Consumer Side (Simulated):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Auto commit is dangerous for backpressure. We'll use manual commits.
props.put("enable.auto.commit", "false");
// Fetch size controls how many records are fetched from Kafka in one go.
props.put("fetch.min.bytes", 1); // Wait for at least 1 byte
props.put("fetch.max.wait.ms", 500); // Wait up to 500ms for data
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
// Poll for records. This is where the consumer *asks* for data.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
// Process the record
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// Simulate slow processing
try {
Thread.sleep(50); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Manually commit offsets *after* processing.
// If the consumer crashes here, these messages will be re-delivered.
consumer.commitSync();
} else {
// No records returned, potentially due to fetch.max.wait.ms
// This is a signal that the consumer is keeping up or there's no data.
}
}
// consumer.close(); // In a real app, this would be in a shutdown hook
The core problem backpressure addresses is the mismatch between the rate at which data is produced and the rate at which it can be consumed. If producers are too fast, queues overflow, and data is dropped or becomes stale.
In an event-driven system, there are several points where this mismatch can occur, and thus several places to implement backpressure:
-
Producer Buffering: The
KafkaProducerhasbuffer.memory. If this buffer fills up because Kafka brokers are slow to acknowledge messages or the network is congested, theproducer.send()calls will block. This is the first line of defense, preventing the producer from overwhelming its own outgoing network buffers or the brokers. Thebatch.sizealso plays a role by influencing how efficiently data is sent to brokers. -
Broker Ingestion: Kafka brokers themselves have limits. They can only ingest data as fast as their disks can write and their network can handle. If a topic is highly partitioned and has many producers, brokers can become a bottleneck. Kafka’s design inherently handles this by acknowledging messages only after they’re written to disk (or a quorum of replicas), effectively signaling back to the producer.
-
Consumer Fetching: The
KafkaConsumer’spoll()method is the primary way it requests data. Thefetch.min.bytesandfetch.max.wait.mssettings influence how aggressively the consumer pulls data. Iffetch.min.bytesis high andfetch.max.wait.msis long, the consumer will wait for a larger batch of data, which can reduce the load on the consumer thread if processing is intermittent but can also increase latency. If the consumer is consistently slow, it will fall behind its assigned partitions. -
Consumer Processing & Committing: This is where the most critical backpressure happens. If a consumer processes messages slowly (e.g., due to external API calls, complex calculations, or database writes), it can fall behind. The
enable.auto.commitsetting is crucial here. Whenfalse(manual commits), the consumer only acknowledges processing by callingcommitSync()orcommitAsync(). If the consumer crashes before committing, messages are re-delivered. This retry mechanism, while ensuring durability, can exacerbate backpressure if the consumer remains slow. If the consumer processes a batch and then commits, it means that entire batch has been successfully handled. -
Downstream Systems: If the Kafka consumer is itself a producer to another system (e.g., a database, another Kafka topic, an API), that downstream system can become the bottleneck. The consumer’s processing logic must account for the throughput limits of these downstream dependencies.
The most counterintuitive aspect of backpressure in systems like Kafka is how blocking is often a feature, not a bug. When a producer’s send() call blocks because the buffer.memory is full, or a consumer’s poll() returns fewer records than expected (due to fetch.max.wait.ms expiring), these aren’t necessarily failures. They are signals that the system is naturally throttling itself to prevent data loss. The producer is being told, "Hold on, I can’t accept more right now," and the consumer, by not processing or committing quickly, is implicitly telling the broker, "I’m still working on the last batch."
The next challenge is managing consumer restarts and ensuring exactly-once or at-least-once processing semantics when consumers do fail and restart.