Your Flink job’s checkpoints are being declined because TaskManagers are unable to signal their checkpoint completion to the JobManager within the configured timeout. This is a critical failure because it prevents Flink from establishing a consistent recovery point, essentially stopping your job from making progress and potentially leading to data loss.

Common Causes and Fixes

1. Network Latency or Instability Between TaskManagers and JobManager: This is the most frequent culprit. TaskManagers send heartbeat and checkpoint completion signals. If these packets are delayed or dropped due to network issues, the JobManager will time out waiting.

  • Diagnosis: Use ping and traceroute from a TaskManager to the JobManager’s IP/hostname. Look for high latency (consistently over 10ms) or packet loss. Monitor network traffic on both the TaskManager and JobManager nodes.
  • Fix:
    • Increase taskmanager.network.netty.connection-timeout: In your flink-conf.yaml, increase this value. A common starting point is 60000 (60 seconds) if you suspect transient network hiccups.
      taskmanager.network.netty.connection-timeout: 60000
      
      This gives TaskManagers more time to re-establish connections or send their signals if the network is temporarily degraded.
    • Optimize Network Configuration: If using cloud VMs, ensure they are in the same availability zone/region. If on-prem, check switch configurations and physical cabling.
  • Why it works: This directly increases the grace period for network communication, allowing delayed signals to arrive before the JobManager declares a failure.

2. Overloaded TaskManagers: If a TaskManager is maxed out on CPU, memory, or I/O, it might not have the resources to process incoming checkpoint requests or send its completion ACK in time.

  • Diagnosis: Monitor TaskManager resource utilization using tools like top, htop, or cloud provider monitoring dashboards. Look for sustained high CPU usage (above 80%), low available memory, or high disk I/O. Check Flink’s UI for TaskManager metrics like "CPU.Load", "Heap.Used", and "Network.Received/Sent".
  • Fix:
    • Increase TaskManager Parallelism/Slots: In your job submission or flink-conf.yaml, increase taskmanager.numberOfTaskSlots or submit your job with higher parallelism.
      # In flink-conf.yaml
      taskmanager.numberOfTaskSlots: 4 # Example: increased from 2
      
      Or when submitting:
      ./bin/flink run -p 8 my_job.jar
      
      This distributes the workload across more processing units.
    • Scale Up TaskManager Instances: If you’re already at high slot density, add more TaskManager instances to your Flink cluster.
    • Optimize Your Flink Job: Profile your Flink job to identify bottlenecks in your operators (e.g., inefficient serialization, heavy computations, slow UDFs).
  • Why it works: Distributing the load ensures each TaskManager has sufficient resources to perform its duties, including timely checkpoint reporting.

3. Insufficient execution.checkpointing.timeout: This is the global timeout for the entire checkpointing process, from when the JobManager requests a checkpoint to when all TaskManagers must report completion. If individual TaskManagers are slow but not necessarily failing, this global timeout might be hit first.

  • Diagnosis: Compare the execution.checkpointing.timeout value with the observed time it takes for checkpoints to complete in Flink’s UI (look at "Checkpoint Duration" and "Alignment Duration"). If the duration is consistently close to the timeout, it’s too short.
  • Fix: Increase execution.checkpointing.timeout in your flink-conf.yaml or job submission. If your checkpoints typically take 30 seconds, set it to 60000 (60 seconds) or higher.
    execution.checkpointing.timeout: 60000
    
  • Why it works: This provides a longer window for the entire checkpointing phase, accommodating slower TaskManagers or network delays without prematurely aborting the checkpoint.

4. Checkpoint Storage Issues (S3, HDFS, etc.): TaskManagers need to write their checkpoint data (state snapshots) to a distributed file system or object store. If this storage is slow, unavailable, or experiencing high latency, TaskManagers will be blocked.

  • Diagnosis: Check the logs of the TaskManagers for errors related to writing to your checkpoint storage. Monitor the performance and health of your S3 bucket, HDFS cluster, or other storage. Use storage-specific tools to test write speeds.
  • Fix:
    • Optimize Storage Configuration: For S3, ensure proper region configuration and consider using S3 Transfer Acceleration if latency is an issue. For HDFS, check namenode/datanode health and network connectivity.
    • Increase state.backend.fs.upload-threads: In flink-conf.yaml, increase the number of threads used for uploading state data.
      state.backend.fs.upload-threads: 16 # Example: increased from default 4
      
      This allows multiple parts of the state to be uploaded concurrently.
    • Use a Faster Storage Backend: If possible, migrate to a more performant storage solution.
  • Why it works: Faster and more reliable writes to the checkpoint storage reduce the time TaskManagers spend in the "checkpointing" state, allowing them to report completion sooner.

5. Large Checkpoint State Size: If your job’s state is enormous, writing and uploading it can take a significant amount of time, potentially exceeding timeouts even with good network and storage.

  • Diagnosis: In Flink’s UI, observe the "Latest Completed Checkpoint" metrics, specifically "Size" and "End to End Duration". If the size is growing rapidly or is consistently in the GBs or TBs, it’s a likely cause.
  • Fix:
    • Optimize State Management: Review your Flink job for opportunities to reduce state size. This might involve:
      • Using keyed state judiciously.
      • Implementing state TTL (Time-To-Live) using StateTtlConfig.
      • Choosing more efficient state backends if applicable (though this is usually a larger architectural decision).
    • Increase Checkpoint Timeout: As a workaround, increase execution.checkpointing.timeout (see point 3) to accommodate the larger state size.
  • Why it works: While not fixing the underlying state size, increasing the timeout gives the system more time to handle the large data transfers. Optimizing state size directly reduces the burden on the storage and network.

6. JobManager Overload or Unresponsiveness: While less common for declined checkpoints (more common for no checkpoints starting), a severely overloaded JobManager might struggle to process incoming ACKs from TaskManagers in a timely manner, leading to timeouts from its perspective.

  • Diagnosis: Monitor the JobManager’s resource utilization. Check the JobManager logs for any signs of high CPU, memory pressure, or excessive garbage collection pauses. Look for "OutOfMemoryError" or frequent GC logs.
  • Fix:
    • Increase JobManager Memory: Adjust the JVM heap size for the JobManager in its startup script (bin/jobmanager.sh).
    • Distribute JobManager Responsibilities: For very large clusters, consider running multiple JobManagers in high-availability mode, although this primarily helps with recovery, not necessarily ACK processing speed.
    • Reduce Checkpoint Frequency: If checkpoints are too frequent for the JobManager to keep up, consider increasing execution.checkpointing.interval.
  • Why it works: Ensuring the JobManager has adequate resources prevents it from becoming a bottleneck in the checkpointing ACK processing pipeline.

The next error you’ll likely encounter after fixing these issues is java.net.SocketTimeoutException: Read timed out if network issues persist, or potentially a OutOfMemoryError on the JobManager if it was the bottleneck and wasn’t addressed.

Want structured learning?

Take the full Flink course →