Flink’s Kafka consumer can lose track of its place in a topic because the consumer group offsets aren’t being managed correctly, leading to duplicate messages or missed data.

Diagnosis: Check your Flink job configuration. If execution.checkpointing.enabled is false or if there are no checkpointing.interval or checkpointing.timeout settings, this is the most likely culprit.

Fix: Enable checkpointing with a reasonable interval and timeout.

execution.checkpointing.enabled: true
execution.checkpointing.interval: 60000 # Checkpoint every 60 seconds
execution.checkpointing.timeout: 600000 # Allow 10 minutes for checkpoints to complete
execution.checkpointing.unaligned: true # Recommended for Kafka sources

This ensures Flink periodically saves its exact position within Kafka partitions to a distributed file system, allowing it to resume precisely where it left off after a failure.

Cause 2: Kafka Broker Offset Committing Mismatch

Diagnosis: Inspect your Flink Kafka consumer configuration. If properties.enable.auto.commit is set to true (which is the default for Kafka clients but not ideal for Flink), or if you’re manually committing offsets in Flink without proper coordination with checkpointing, you’ll have issues.

Fix: Disable Kafka’s auto-commit and rely solely on Flink’s checkpointing for offset management.

properties.enable.auto.commit: false
properties.auto.offset.reset: earliest # Or latest, depending on desired restart behavior

Flink’s checkpointing mechanism is designed to be an atomic operation. When a checkpoint succeeds, Flink guarantees that the offsets corresponding to the data included in that checkpoint have been committed to Kafka (or stored internally by Flink, depending on the connector version and configuration). Disabling Kafka’s auto-commit prevents Flink from committing offsets before a checkpoint is safely stored, avoiding the scenario where Flink crashes after committing but before checkpointing.

Cause 3: Incorrect auto.offset.reset Configuration

Diagnosis: Examine the properties.auto.offset.reset setting in your Flink Kafka consumer. If this is set to latest and Flink restarts after a period of inactivity, it will start consuming from the newest messages, potentially missing older data that was processed before the restart. If set to earliest and Flink restarts after a long downtime, it might reprocess a huge amount of old data.

Fix: Set properties.auto.offset.reset to earliest or latest based on your recovery strategy, but understand its implications. For most fault-tolerant streaming jobs, earliest is preferred for ensuring no data is missed on restart, relying on checkpointing to track progress.

properties.auto.offset.reset: earliest

This tells the Kafka consumer to start reading from the beginning of the partition if no committed offset is found for the consumer group, ensuring that all available data is processed upon a fresh startup.

Cause 4: Inconsistent Consumer Group IDs

Diagnosis: Verify the group.id property used by your Flink Kafka consumer. If multiple Flink jobs or even different instances of the same Flink job (e.g., due to improper deployment or scaling) are using the same group.id to consume from the same Kafka topics, they will compete for partitions and offsets, leading to unpredictable consumption.

Fix: Ensure each distinct Flink job or logical consumer instance has a unique group.id.

group.id: my-unique-flink-kafka-consumer-group-v1

A unique group.id isolates the offset tracking for that specific consumer group, preventing interference and ensuring that each consumer instance correctly tracks its own progress within Kafka.

Diagnosis: Compare the number of partitions for your Kafka topics with the parallelism of your Flink Kafka consumer operator. If the Flink parallelism is significantly higher than the number of Kafka partitions, some Flink tasks will remain idle, and others will be overloaded. If the parallelism is lower, it might not be an issue for offset tracking itself but for processing throughput. The critical issue for offset tracking is when Flink’s internal task assignment to partitions becomes unstable.

Fix: Align Flink consumer parallelism with Kafka partition count. Ideally, Flink parallelism should be less than or equal to the number of Kafka partitions. If you need higher throughput, consider increasing Kafka partitions (which requires careful planning) or using Flink’s rebalance or shuffle strategies after the Kafka consumer source, not within it.

# In Flink job graph, set parallelism for the Kafka consumer operator
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("...")
    .setTopics("...")
    .setGroupId("...")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.setParallelism(Math.min(kafkaPartitions, 4)); // Example: set parallelism to 4 or number of partitions, whichever is smaller

This ensures a stable and predictable assignment of Kafka partitions to Flink consumer tasks. When a Flink task is assigned a partition, it knows which offsets to track for that specific partition. If this assignment fluctuates wildly due to parallelism mismatches, offset tracking becomes unreliable.

Cause 6: Kafka Broker/Zookeeper Issues (Less Common for Offset Loss but Affects Availability)

Diagnosis: Check Kafka broker and Zookeeper logs for errors related to connection loss, leader election, or partition reassignments. Monitor Kafka’s internal metrics for broker health.

Fix: Address underlying Kafka infrastructure problems. This might involve restarting brokers, ensuring Zookeeper quorum, or increasing broker resources. This is a fundamental fix for the Kafka cluster itself, ensuring it can reliably serve metadata and partition leadership information, which is crucial for Flink to even connect and request offsets.

The next error you’ll likely encounter is a java.lang.RuntimeException: Kafka commit failed if offsets are still not being committed correctly, or a org.apache.flink.runtime.checkpoint.CheckpointException if checkpointing itself begins to fail.

Want structured learning?

Take the full Flink course →