The Flink JobManager is failing to acknowledge completed checkpoints because the TaskManagers are reporting them too late, causing the JobManager to discard them as expired.
The most common culprit is a temporary network partition or congestion between the TaskManagers and the JobManager. TaskManagers might be able to write their state to the configured state backend (like S3 or HDFS) but the checkpoint completion notification to the JobManager gets stuck.
Diagnosis:
Check the TaskManager logs for messages like Checkpoint [X] duration exceeded checkpoint timeout of [Y]ms. Also, look for Checkpoint [X] failed: Checkpoint is too late in the JobManager logs.
Cause 1: Network Latency/Packet Loss High latency or packet loss on the network segment connecting TaskManagers to the JobManager can delay checkpoint completion messages.
- Diagnosis: Run
pingandtraceroutefrom a TaskManager to the JobManager’s IP address. Monitor network traffic on the JobManager and TaskManager interfaces for unusually high retransmission rates or latency. - Fix: If network issues are found, work with your network team to resolve them. In the short term, you can increase the
execution.checkpointing.timeoutconfiguration. For example, change it from the default10m(10 minutes) to15m:
This gives TaskManagers more time to report completion, masking temporary network blips.execution.checkpointing.timeout: 15m - Why it works: This increases the window of time the JobManager waits for a checkpoint completion signal before declaring it expired.
Cause 2: Under-provisioned JobManager CPU/Memory If the JobManager is overloaded with other tasks (e.g., managing many subtasks, frequent rescaling, or other job coordination), it might be too slow to process incoming checkpoint completion notifications.
- Diagnosis: Monitor the JobManager’s CPU and memory usage. If it’s consistently above 80-90%, it’s a strong indicator. Look for long garbage collection pauses in the JobManager logs.
- Fix: Increase the JVM heap size for the JobManager and/or allocate more CPU resources. For example, if running Flink in Kubernetes, increase the
resources.requests.cpuandresources.limits.cpufor the JobManager deployment. If running standalone, adjust theFLINK_JM_HEAPenvironment variable.
Or in# Example for standalone with Flink 1.15+ export FLINK_JM_HEAP=8192 # Sets heap to 8GBflink-conf.yaml:jobmanager.heap.size: 8192m - Why it works: A less-loaded JobManager can process incoming RPC calls (like checkpoint acknowledgments) faster, reducing the chance of timeouts.
Cause 3: Long-running savepoint.cleanup.async operations
If asynchronous savepoint cleanup is enabled and the underlying storage (like S3) is slow to respond, it can block checkpointing threads or consume JobManager resources.
- Diagnosis: Check if
execution.savepoint.cleanup-in-backgroundis set totrue. Examine Flink logs for any messages related to savepoint cleanup taking an unusually long time. - Fix: Temporarily disable asynchronous savepoint cleanup by setting
execution.savepoint.cleanup-in-background: falseor increase theexecution.checkpointing.timeoutsignificantly to allow these operations to complete.execution.savepoint.cleanup-in-background: false - Why it works: Disabling this feature prevents potential blocking or resource contention caused by slow external storage during savepoint operations, which can indirectly affect checkpointing.
Cause 4: State Backend Bottlenecks (e.g., slow S3/HDFS writes) While TaskManagers might be able to initiate writes to the state backend, very slow write performance can delay the completion of the write, which is a prerequisite for acknowledging the checkpoint. This delay can push the acknowledgment past the JobManager’s timeout.
- Diagnosis: Monitor the performance of your state backend (e.g., S3 PUT operation latency, HDFS block write speeds). Look for unusually long durations in TaskManager logs specifically related to state backend operations within checkpointing.
- Fix: Optimize your state backend configuration or underlying infrastructure. For S3, consider using a more performant region or checking S3 throttling. For HDFS, investigate NameNode and DataNode performance. In Flink, you can sometimes mitigate this by increasing the number of parallel checkpointing threads if your state backend supports it, or by adjusting
execution.checkpointing.max-concurrent-checkpoints.execution.checkpointing.max-concurrent-checkpoints: 10 - Why it works: Increasing concurrent checkpoints allows more checkpoints to be in flight, potentially overlapping slow operations and reducing the impact of a single slow write on the overall checkpointing throughput.
Cause 5: Large State Size and High Checkpoint Throughput If your job has a very large state and you’re checkpointing frequently, the sheer volume of data being written and the number of completion notifications can overwhelm the network or the JobManager, especially during peak load.
- Diagnosis: Compare the reported state size of your checkpoints with the configured
execution.checkpointing.intervalandexecution.checkpointing.timeout. If state size is in GBs and interval is seconds, it’s a high-pressure scenario. - Fix: Increase the checkpointing interval to reduce the frequency of large checkpoints. For instance, change from
1mto5m:
Or, consider enabling incremental checkpoints if not already active and your state backend supports it, which can significantly reduce the amount of data written on subsequent checkpoints.execution.checkpointing.interval: 5m - Why it works: A longer interval means fewer checkpoints need to be processed and acknowledged within any given time window, reducing the load on the JobManager and network.
Cause 6: Flink Version Bugs Older Flink versions might have known issues with checkpointing coordination or RPC handling that have been fixed in later releases.
- Diagnosis: Check Flink release notes for known issues related to checkpointing timeouts or JobManager/TaskManager communication in your specific Flink version.
- Fix: Upgrade Flink to a more recent stable version. Ensure your JobManager and TaskManagers are running the exact same Flink version.
- Why it works: Newer versions often contain performance improvements and bug fixes that resolve underlying issues in the checkpointing mechanism.
The next error you’ll likely encounter if these issues are resolved is related to OutOfOrderEventException if your event time processing is sensitive to delays, or potentially AsyncException if background operations like state cleanup are still struggling under load.