Flink checkpoints are a core mechanism for fault tolerance, but their persistence is often glossed over.

Let’s see what happens when a Flink job with checkpointing enabled actually runs and writes its state.

Imagine a simple Flink job that reads Kafka messages, performs a map operation, and then writes to another Kafka topic. When checkpointing is enabled, Flink periodically pauses all operators, takes a snapshot of their state, and writes that state to a durable storage like HDFS or S3. This state includes things like Kafka offsets for source operators or any accumulated data in stateful operations like keyBy().sum().

Here’s a simplified example of how you might configure checkpointing in your Flink job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure checkpointing
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Minimum 30 seconds pause
env.getCheckpointConfig().setCheckpointTimeout(10 * 60000); // 10 minutes timeout
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Only one checkpoint at a time
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Keep checkpoints on cancellation

// Configure state backend to use HDFS
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

// ... rest of your job definition ...

env.execute("MyStreamingJob");

In this configuration:

  • enableCheckpointing(60000): Flink will attempt to start a checkpoint every 60 seconds.
  • setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE): Guarantees that each record will be processed exactly once, even in the face of failures. This is achieved by Flink coordinating with the state backend and the source/sink systems.
  • setMinPauseBetweenCheckpoints(30000): Ensures that Flink doesn’t immediately start a new checkpoint right after one finishes, allowing operators time to recover.
  • setCheckpointTimeout(10 * 60000): If a checkpoint takes longer than 10 minutes, it will be aborted.
  • setMaxConcurrentCheckpoints(1): Flink will wait for the current checkpoint to complete before initiating another.
  • enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION): If the job is cancelled, the checkpoint data will be retained in the configured storage. If set to DELETE_ON_CANCELLATION, it would be removed.
  • setStateBackend(new FsStateBackend(...)): This is the crucial part for this topic, specifying that Flink’s state should be stored on HDFS at the given path. You could similarly configure S3StateBackend for S3.

The problem Flink checkpointing solves is state recovery. Without it, if a Flink task manager fails, all its in-memory state is lost. Upon restart, the job would have to reprocess all data from the beginning, which is often infeasible for large datasets or long-running jobs. Checkpointing allows Flink to resume from the last completed checkpoint, effectively rewinding the job’s state to that point and continuing processing from there.

Internally, Flink’s checkpointing involves a two-phase commit protocol. When a checkpoint is triggered, the JobManager sends a checkpoint barrier into the data streams. As these barriers propagate through the operators, each operator snaps its current state. Once an operator has processed all data before the barrier and taken its snapshot, it passes the barrier downstream. When a task manager receives a barrier for all its input streams, it finalizes its part of the checkpoint and reports back to the JobManager. This coordinated snapshotting ensures consistency.

The FsStateBackend (for HDFS/local filesystem) and S3StateBackend (for S3) are the primary choices for durable storage. These backends serialize the operator state (e.g., keyed state, operator-specific data) and write it as files to the configured durable storage. When a job needs to recover, Flink reads these files from HDFS or S3, deserializes the state, and restores the operators to their pre-failure condition.

The one thing most people don’t realize is how much network traffic and disk I/O checkpointing can generate, especially with large state. Flink doesn’t just write a single file; it typically writes multiple files per operator, and each of these files can be quite large. If your state backend is configured for HDFS, this means significant load on your HDFS NameNode and DataNodes. For S3, it means a high volume of PUT requests. This is why choosing an appropriate checkpoint interval and state serialization format (e.g., Kryo vs. Avro) is critical for performance.

Understanding how Flink manages and persists its state is the next step to optimizing your fault tolerance strategy.

Want structured learning?

Take the full Flink course →