Flink state is surprisingly resilient to version upgrades, but only if you follow a specific upgrade path and understand how its internal serialization and snapshotting mechanisms work.

Let’s see Flink state in action. Imagine a simple Flink job that counts words from a Kafka stream and stores the counts in a ValueState.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class WordCountStateful {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure Kafka consumer (replace with your Kafka details)
        // ...

        DataStream<String> textStream = env.fromElements(
                "flink is awesome",
                "state compatibility is key",
                "upgrade with care",
                "flink is great"
        );

        DataStream<Tuple2<String, Long>> wordCounts = textStream
                .flatMap(new Tokenizer())
                .keyBy(0) // Key by word
                .process(new CountWithState());

        wordCounts.print();

        env.execute("Stateful Word Count");
    }

    public static class Tokenizer implements FlatMapFunction<String, String> {
        @Override
        public void flatMap(String value, Collector<String> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (!token.isEmpty()) {
                    out.collect(token);
                }
            }
        }
    }

    public static class CountWithState extends ProcessFunction<String, Tuple2<String, Long>> {
        private transient ValueState<Long> countState;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Long> descriptor =
                    new ValueStateDescriptor<>("word-count", Types.LONG);
            countState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(String word, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            Long currentCount = countState.value();
            if (currentCount == null) {
                currentCount = 0L;
            }
            currentCount++;
            countState.update(currentCount);
            out.collect(Tuple2.of(word, currentCount));
        }
    }
}

When this job runs, Flink creates a ValueStateDescriptor for "word-count" of type Long. For each unique word (e.g., "flink"), it maintains a ValueState instance containing its count. Flink automatically snapshots this state periodically. During an upgrade, Flink uses these snapshots to restore the state to the new job version.

The core problem in stateful upgrades is ensuring that the new Flink job can correctly read and interpret the state data saved by the old Flink job. This involves understanding how Flink serializes your state objects and how it manages state backends (like RocksDB or the filesystem).

The golden rule for Flink state compatibility is: never change the TypeInformation of your state fields if you want to maintain compatibility across upgrades without explicit migration. If you have ValueState<Integer> and decide to change it to ValueState<Long>, Flink’s default deserialization will likely fail because it expects to read an Integer but finds data structured for a Long. This applies to all state primitives and complex types.

The primary mechanism Flink uses for state compatibility is its state snapshotting and restoration process. When you upgrade Flink, you typically stop the old job, deploy the new job with the same jobmanager.savepoint-path configured, and Flink attempts to restore the state. If the state descriptors (names and types) in the new job match those of the old job, and the underlying serialization formats are compatible, restoration succeeds.

Here’s the critical part about upgrading: You must upgrade Flink in a specific sequence: first, upgrade the Flink runtime on your cluster, then deploy your job with the new Flink runtime, and finally, trigger a savepoint from the old job before stopping it, and then restore from that savepoint into the new job. This ensures that the savepoint is created by the old version and read by the new version, allowing Flink’s compatibility layer to handle any internal format changes.

The most common pitfall is changing the TypeInformation of your state. For instance, if you have a MapState<String, MyObject> and decide to change MyObject to MyObjectV2 (even if MyObjectV2 has a constructor that accepts MyObject’s fields), Flink will struggle.

The key to successful upgrades is meticulous state descriptor management. You can have multiple state descriptors with different names. If you need to change the type of a state, the recommended approach is to introduce a new state descriptor with a new name, migrate the data from the old state to the new state within your Flink job (this is often done in a RichFunction’s open() method or by adding a new ProcessFunction that reads old state and writes to new state), and then eventually remove the old state descriptor.

For example, if you have ValueState<Integer> oldCounter and want to change it to ValueState<Long> newCounter:

  1. Introduce the new state:

    ValueStateDescriptor<Long> newCounterDescriptor =
        new ValueStateDescriptor<>("new-word-count", Types.LONG);
    ValueState<Long> newCounterState = getRuntimeContext().getState(newCounterDescriptor);
    
  2. Migrate in open() or a dedicated operator:

    @Override
    public void open(Configuration parameters) throws Exception {
        // ... other initializations
    
        // Migrate old state to new state
        ValueStateDescriptor<Integer> oldCounterDescriptor =
            new ValueStateDescriptor<>("word-count", Types.INT); // Assuming old state was INT
        ValueState<Integer> oldCounterState = getRuntimeContext().getState(oldCounterDescriptor);
    
        Integer oldCounterValue = oldCounterState.value();
        if (oldCounterValue != null) {
            newCounterState.update(oldCounterValue.longValue());
            // Optionally clear old state to save space: oldCounterState.clear();
        }
    }
    
  3. Use newCounterState going forward.

This migration logic is typically placed in the open() method of a RichFunction or in a separate ProcessFunction that runs after the old operator and reads from its state.

If you are using RocksDB as your state backend and need to perform schema evolution on your state objects (e.g., adding new fields to a POJO), you’ll want to ensure your POJOs implement org.apache.flink.api.common.ExecutionConfig.Global.isKryoSerializable() and are properly versioned, or better yet, use Flink’s built-in state evolution mechanisms. However, the safest bet is to avoid changing the fundamental types.

The most common mistake is a direct type change like ValueState<Integer> to ValueState<Long> without a migration path. Flink’s default serialization (often Kryo) will attempt to deserialize the old Integer data into a Long, which often results in a ClassCastException or similar deserialization error during state restoration.

When upgrading, ensure your Flink version is compatible with your savepoint format. A savepoint created by Flink 1.15 can generally be restored by Flink 1.16, but not the other way around without specific migration steps. Always upgrade to a newer Flink version to read savepoints from older versions.

The next hurdle you’ll likely encounter after successfully upgrading state is managing the lifecycle of your savepoints, specifically ensuring you don’t accumulate too many and that they are stored reliably.

Want structured learning?

Take the full Flink course →