Incremental checkpointing in Flink with RocksDB isn’t just a performance tweak; it fundamentally changes how Flink recovers from failures by only persisting changed state, not the whole darn thing.

Let’s see it in action. Imagine a simple Flink job that counts words from a stream.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class IncrementalCheckpointingExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure for incremental checkpointing with RocksDB
        env.enableCheckpointing(5000); // Checkpoint every 5 seconds
        env.getCheckpointConfig().enableIncrementalCheckpointing(); // THIS IS THE KEY

        // Set RocksDB as the state backend
        env.setStateBackend(new org.apache.flink.contrib.streaming.state.RocksDBStateBackend("hdfs:///flink/checkpoints"));

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        DataStream<Tuple2<String, Integer>> counts = text
            .flatMap(new LineSplitter())
            .keyBy(0)
            .sum(1);

        counts.print();

        env.execute("Incremental Checkpointing Example");
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.toLowerCase().split("\\W+")) {
                if (!word.isEmpty()) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}

When this job runs, Flink will periodically save its state. With incremental checkpointing enabled, instead of writing the entire RocksDB state files to HDFS (or wherever you’ve configured your checkpoint storage), Flink will identify only the portions of the RocksDB database that have changed since the last checkpoint. These changed portions, known as "SSTables" in RocksDB terminology, are then uploaded. On recovery, Flink will fetch the last full checkpoint and then apply only these incremental changes.

The problem this solves is clear: large state. As your Flink job processes more data and its state grows, traditional full checkpoints become a massive I/O bottleneck and take a prohibitively long time. This directly impacts recovery time. If a job with terabytes of state takes hours to checkpoint, it will take hours to recover, leading to significant downtime. Incremental checkpointing drastically reduces both checkpointing time and recovery time by only dealing with the delta.

Internally, RocksDB maintains its state in immutable files called SSTables. When state is updated in Flink, RocksDB writes new data, potentially creating new SSTables or modifying existing ones (though RocksDB’s LSM-tree structure means modifications often result in new files being written and old ones eventually being compacted away). Flink’s RocksDB state backend leverages this. When a checkpoint is triggered, Flink asks RocksDB for the list of SSTables that constitute the current state. It then compares this list to the SSTables from the previous checkpoint. Any SSTables that are new or have been modified (detected by comparing metadata like file checksums or sizes) are considered incremental changes and are uploaded to the checkpoint storage. The checkpoint metadata then references these incremental files.

The primary levers you control are:

  • Enabling Incremental Checkpointing: This is the env.getCheckpointConfig().enableIncrementalCheckpointing() call. Without this, you get full checkpoints.
  • Checkpoint Interval: env.enableCheckpointing(5000) sets how often checkpoints are taken. A shorter interval means more frequent incremental uploads but smaller individual deltas. A longer interval means fewer uploads but potentially larger deltas.
  • State Backend Configuration: RocksDBStateBackend is crucial. Incremental checkpointing is a feature of RocksDB’s underlying storage mechanism.
  • Checkpoint Storage Location: The RocksDBStateBackend("hdfs:///flink/checkpoints") points to where the state is stored. This needs to be accessible and performant for uploads and downloads.

A subtle but critical aspect of incremental checkpointing in RocksDB is the role of compaction and garbage collection. RocksDB continuously performs background compactions to merge SSTables and remove obsolete data. These compactions can create new SSTables. Flink’s incremental checkpointing mechanism needs to be aware of which SSTables are truly part of the current Flink state and which are transient or remnants of older, compacted versions that are no longer referenced by Flink’s state handles. The Flink RocksDB backend carefully tracks the lifecycle of these files, ensuring that only the necessary SSTables are uploaded as part of an incremental checkpoint. If compactions are too aggressive or not configured optimally, it might lead to more SSTables being generated than strictly necessary for the Flink state itself, slightly increasing the overhead, but Flink’s logic is designed to filter these out.

The next thing you’ll likely encounter is understanding how to monitor the size of these incremental checkpoints and how they affect your checkpoint storage costs.

Want structured learning?

Take the full Flink course →