Your Flink job is failing because watermarks are not advancing, effectively freezing your event-time processing. This happens when Flink’s internal clock, driven by event timestamps, stops ticking because no new data is arriving from one or more of your upstream sources.

Here are the common reasons and how to fix them:

Cause 1: Truly Idle Source

The source is actually idle, meaning no data has been produced for a significant period.

Diagnosis: Check your source connector’s metrics. For Kafka, look for kafka.consumer-metrics.records-lag-max in source-metrics.json (or use flink-metrics-prometheus and query flink_source_kafka_records_lag_max). If this value is consistently high or zero for a specific partition, that partition is not producing data. You can also check the Kafka broker directly using kafka-consumer-groups.sh --describe --bootstrap-server <brokers> --group <group_id>.

Fix: If the source is supposed to be idle, you need to configure Flink to handle this. For Kafka, set flink.partition-discovery.interval-millis to a value that makes sense for your data production frequency (e.g., 60000 for once a minute). If data should be arriving, investigate why the producer is silent or why Flink can’t consume it (see Cause 2).

Why it works: Flink’s Kafka consumer periodically polls Kafka for new topic metadata, including partition discovery. If a source is truly idle, Flink won’t see new events to advance watermarks. Increasing the partition discovery interval tells Flink to check less often for new data, preventing excessive logging and resource usage when a source is legitimately quiet. If data should be arriving, this fix won’t help, and you need to address the producer or consumer configuration.

Flink’s Kafka consumer is not keeping up with the Kafka broker, leading to a lag. This isn’t just about the lag metric; it’s about Flink’s consumer group being stuck.

Diagnosis: Use kafka-consumer-groups.sh --describe --bootstrap-server <brokers> --group <group_id>. If the "CURRENT-OFFSET" is significantly behind "LOG-END-OFFSET" for any partition, you have lag. Also, check Flink’s Kafka consumer metrics for numRecordsOutPerSecond and deserializationFailuresNew. High deserialization failures indicate Flink is struggling to process messages.

Fix:

  1. Increase Kafka Consumer Parallelism: In your Flink job submission, ensure the parallelism of your Kafka source operator matches or exceeds the number of partitions in the Kafka topic. For example, if your Kafka topic has 12 partitions, set source.setParallelism(12).
  2. Tune Kafka Consumer Settings: Adjust fetch.min.bytes (e.g., 1048576) and fetch.max.wait.ms (e.g., 500) in your Kafka consumer configuration.
  3. Increase Task Manager/Job Manager Resources: If deserialization or processing is the bottleneck, you might need more CPU or memory.

Why it works: By matching parallelism to partitions, you allow Flink to consume each partition concurrently. Tuning fetch settings can improve throughput by allowing the consumer to fetch larger batches of data, reducing the overhead of many small requests. More resources mean Flink can deserialize and process messages faster.

Cause 3: Source Operator Stuck in a Long-Running Operation

A specific source operator instance is blocked on a network call, a database query, or some other blocking operation, preventing it from fetching new data.

Diagnosis: Enable Flink’s thread dumps. Trigger a thread dump from the TaskManager running the source operator (e.g., via the Flink UI -> TaskManager -> Thread Dump). Analyze the stack traces for threads stuck in I/O operations, waiting on locks, or in a RUNNABLE state for an unusually long time without making progress.

Fix: This is highly dependent on the source.

  • External Service: If the source is calling an external service that’s slow, implement timeouts and retries, or investigate the performance of the external service.
  • Database: If querying a database, optimize the query, ensure proper indexing, or increase database resources.
  • Custom Logic: Refactor the blocking code to be non-blocking or asynchronous.

Why it works: Identifying and resolving the specific blocking operation allows the source operator thread to resume fetching data, unblocking the watermark progression.

The TaskManager running the source operator doesn’t have enough network buffers to send the data downstream, or the downstream operator doesn’t have enough to receive it. This can cause the source to effectively pause.

Diagnosis: Monitor Flink’s network buffer usage in the TaskManager metrics. Look for buffers.input.available and buffers.output.available. If these are consistently low, especially on the TaskManager running your source operator and its downstream consumer, you have a buffer issue.

Fix: Increase the number of network buffers available to your TaskManagers. Set taskmanager.memory.network.fraction (e.g., 0.1 for 10% of JVM memory) and taskmanager.network.memory.min/max (e.g., 67108864 / 1073741824 bytes) in flink-conf.yaml.

Why it works: More network buffers allow data to flow more smoothly between operators, preventing backpressure from the downstream consumer from completely halting the upstream source.

Cause 5: Source Connector Bug or Misconfiguration

The specific connector you are using has a bug, or its configuration is fundamentally incorrect for your environment.

Diagnosis: Check the Flink Jira, mailing lists, and Stack Overflow for known issues with your specific connector version and Flink version. Try simplifying your source configuration to the bare minimum required parameters.

Fix:

  • Upgrade/Downgrade Connector: If a bug is identified, try upgrading to a newer version of Flink and its connectors, or downgrading if the issue was introduced in a recent release.
  • Correct Configuration: Double-check all connection parameters, authentication details, and topic/queue names. For Kafka, ensure group.id, bootstrap.servers, and auto.offset.reset are correctly set.

Why it works: A correct or bug-free connector ensures that Flink can reliably establish a connection and fetch data without encountering unexpected errors or states.

Cause 6: Watermark Strategy Issue

Your custom WatermarkStrategy is not correctly generating or emitting watermarks, especially when dealing with late events or idle periods.

Diagnosis: Temporarily switch to a simpler, built-in watermark strategy like WatermarkStrategy.forMonotonousTimestamps() (if your timestamps are indeed monotonic). If watermarks start advancing, your custom strategy is the culprit. Debug your custom WatermarkStrategy implementation by adding logging within its onEvent and onPeriodicEmit methods.

Fix: Ensure your WatermarkStrategy correctly handles gaps in timestamps. For example, if you use forBoundedOutOfOrderness(Duration.ofMinutes(5)), make sure your maxOutOfOrderness is sufficient. If your source can be idle for long periods, consider how your WatermarkStrategy interacts with assignTimestampsAndWatermarks. Sometimes, a forGenerator with a custom WatermarkGenerator that explicitly handles idle periods by emitting a "current time" watermark after a timeout might be necessary.

Why it works: A correctly implemented watermark strategy ensures that Flink’s event-time clock advances appropriately, even with out-of-order events or periods of inactivity, by accurately reflecting the passage of real time relative to event time.

After addressing these, you’ll likely encounter issues with late data being dropped if your watermark strategy isn’t robust enough.

Want structured learning?

Take the full Flink course →