Flink checkpointing is the mechanism by which Flink captures the state of your application at regular intervals, allowing it to resume from a consistent point after a failure.
Here’s Flink’s streaming job running, processing Kafka messages and writing to a database. Notice the checkpointing interval is set to 30 seconds.
{
"execution.checkpointing.interval": "30s",
"execution.checkpointing.mode": "EXACTLY_ONCE",
"execution.checkpointing.storage": "s3://my-flink-checkpoints/my-job",
"execution.checkpointing.timeout": "10m",
"execution.checkpointing.tolerable-failed-checkpoints": 1
}
When Flink takes a checkpoint, it freezes the data streams, takes a snapshot of the operator states, and writes these snapshots to a durable external storage (like S3, HDFS, or GCS). The checkpoint is considered "successful" only when all tasks have acknowledged their part of the snapshot. This ensures that when a job restarts, Flink can restore each operator to the exact state it had when the last successful checkpoint was completed.
The core problem Flink checkpointing solves is maintaining exactly-once processing guarantees in the face of distributed system failures. Without it, a job might reprocess messages or skip them entirely after a crash. Checkpointing provides a point-in-time recovery mechanism.
Internally, Flink uses a distributed snapshotting algorithm (often inspired by Chandy-Lamport) to achieve this. When a checkpoint is triggered, a special "checkpoint barrier" is injected into the data streams at the source operators. This barrier flows through the operators. When an operator receives a barrier from its upstream input, it first takes a snapshot of its own state, then forwards the barrier to its downstream output. Once all operators have processed their barriers and snapshotted their states, the checkpoint is complete. This barrier synchronization ensures that the state snapshot captures a consistent global state across all operators at a specific point in time.
The execution.checkpointing.interval defines how often these snapshots are taken. A shorter interval means less data loss in case of a failure but increases the overhead of checkpointing. A longer interval reduces overhead but increases potential data loss. The execution.checkpointing.mode can be EXACTLY_ONCE or AT_LEAST_ONCE. EXACTLY_ONCE is stronger but requires more coordination and idempotent sinks or transactional sinks. AT_LEAST_ONCE is simpler and faster but may result in duplicate processing if not handled by the sink.
The execution.checkpointing.storage points to where the checkpoint data is stored. This must be a durable, accessible location for Flink to read from upon restart. execution.checkpointing.timeout is crucial; it’s the maximum time Flink will wait for a checkpoint to complete before considering it failed. If this is too short for your workload, valid checkpoints might be aborted. execution.checkpointing.tolerable-failed-checkpoints allows the job to continue running even if a few checkpoints fail. This is useful for transient network issues or temporary storage unavailability, preventing the job from crashing due to minor hiccups.
The one aspect often overlooked is how Flink handles operator restarts during a checkpoint. If an operator fails while a checkpoint is in progress, Flink will typically abort that checkpoint and wait for the operator to recover. However, if the operator fails after its state has been snapshotted for a checkpoint but before the entire checkpoint is acknowledged, Flink will still attempt to recover using that completed checkpoint. The key is that the barrier synchronization ensures that the state saved corresponds to a consistent point, even if some operators are momentarily unavailable as the checkpoint completes.
The next major configuration concern you’ll face is tuning the checkpointing timeout based on your job’s latency and throughput.