Migrating Flink jobs across versions using savepoints is surprisingly more about understanding the internal state representation than the job code itself.

Let’s see this in action. Imagine you have a Flink job, MyStreamingJob, running on Flink 1.15. It’s processing Kafka messages, performing a windowed aggregation, and writing to Elasticsearch.

// Flink 1.15 Job Code (simplified)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds

DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), kafkaProps));

DataStream<Tuple2<String, Long>> aggregatedStream = kafkaStream
    .map(new MyMapper())
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new MyAggregator())
    .map(new MyResultMapper());

aggregatedStream.addSink(new ElasticsearchSink<>(esConfig));

JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// ... submit jobGraph ...

Now, you want to upgrade to Flink 1.18. The primary mechanism for this is savepoints. A savepoint is a point-in-time snapshot of your job’s state, including operator states and checkpoint IDs. You take a savepoint from the old version and restore from it on the new version.

Taking a Savepoint (Flink 1.15):

You’d typically trigger this via the Flink CLI or the REST API.

# Using Flink CLI
./flink savepoint <job-id> --target file:///path/to/savepoints/my-job-1.15

This creates a directory structure under /path/to/savepoints/my-job-1.15 containing metadata and state files.

Restoring from a Savepoint (Flink 1.18):

When submitting your Flink 1.18 job, you tell it to restore from the existing savepoint.

# Using Flink CLI
./flink run -s file:///path/to/savepoints/my-job-1.15/savepoint-xxxxx-yyyyyy \
  --target yarn-application \
  -d \
  my-streaming-job-1.18.jar

The -s flag specifies the savepoint path, and -d (or --allowNonRestoredState) is crucial if Flink detects state that it can’t automatically map.

The Core Problem: State Schema Evolution

The most common reason for migration failure isn’t that your code is incompatible (though that can happen), but that the internal representation of the state has changed. Flink serializes operator state (like the state within your MyAggregator or Kafka offsets in FlinkKafkaConsumer) into files. If the format of this serialized state changes between Flink versions, the new Flink runtime won’t know how to deserialize the old state.

Common Causes of Migration Failure:

  1. Internal State Serialization Format Changes: This is the most frequent culprit. Flink’s internal serializers for state can evolve. For example, how a MapState or ListState is stored might change.

    • Diagnosis: Look for errors like java.io.IOException: Error restoring state from savepoint ... or specific deserialization errors indicating incompatible types or versions. The Flink UI will often show the job failing to start with a state restoration error.
    • Fix: This is where the --allowNonRestoredState (or -d) flag becomes your best friend. If Flink can’t map a specific state part, this flag tells it to discard that specific piece of state and initialize it as new. For instance, if your MyAggregator’s internal state structure changed, Flink might be unable to restore it. Using -d would allow the job to start, but the aggregator would begin with an empty state, effectively resetting its progress for that specific operator.
    • Why it works: It’s a controlled way to acknowledge that some state is unrecoverable and should be re-initialized. This is acceptable for stateless operators or for state where losing it is a minor inconvenience (e.g., a cache) but problematic for critical state like Kafka offsets or window accumulators.
  2. Operator ID Changes: Flink uses operator IDs to uniquely identify state. If you refactor your job and change the order of operators or add/remove operators without explicitly assigning stable IDs, the IDs might change, leading Flink to believe the state belongs to a new operator.

    • Diagnosis: Errors like State for operator with ID X not found or Cannot find state descriptor for key Y when the operator should exist.
    • Fix: Use setParallelism() and uid() to assign stable, unique IDs to your operators in both your old and new Flink jobs. For example, kafkaStream.uid("kafka-source"); aggregatedStream.uid("aggregation-operator");. Ensure these UIDs are identical in the Flink 1.15 and Flink 1.18 codebases where the state is managed.
    • Why it works: uid() explicitly ties a Flink operator instance to a specific state identifier. When restoring, Flink looks for state associated with that UID. If the UIDs match, it correctly maps the savepoint state to the new operator instance.
  3. Dependency Version Mismatches: Your job might depend on external libraries (like Kafka client, Avro, Protobuf). If the versions of these libraries used in the Flink 1.15 job’s environment differ significantly from those in the Flink 1.18 environment, the serialization/deserialization of data types managed by these libraries can break.

    • Diagnosis: ClassNotFoundException or NoClassDefFoundError during state restoration, or errors related to specific data formats (e.g., Avro schema evolution issues).
    • Fix: Ensure that all dependencies used by your Flink job are consistent across the Flink 1.15 and 1.18 environments. This often means aligning the versions of libraries like flink-connector-kafka, flink-avro, flink-parquet, etc., in your Flink 1.18 project to match those that were implicitly or explicitly used by your Flink 1.15 job.
    • Why it works: State often contains serialized objects from these libraries. If the library versions differ, the bytes representing the object might be interpreted differently or not at all by the new runtime, leading to deserialization failures.
  4. Checkpointing Configuration Differences: While less common for direct savepoint restoration failure, significant changes in checkpointing intervals or modes between versions can subtly affect savepoint consistency or lead to issues immediately after restore if not aligned.

    • Diagnosis: Jobs might appear to restore but then fail shortly after due to inconsistent state or missed checkpoints. Or, the savepoint itself might be taken with a different configuration than expected.
    • Fix: Ensure checkpointing is enabled and configured similarly (e.g., interval, timeout, storage location) in both Flink versions. For Flink 1.18, env.enableCheckpointing(5000); should be present, and the state backend (e.g., RocksDBStateBackend, FsStateBackend) should be compatible or explicitly reconfigured if necessary.
    • Why it works: Checkpoints are the foundation upon which savepoints are built. Inconsistent checkpointing configurations can lead to a savepoint that doesn’t fully represent a consistent job state, or the new job might not be able to continue the checkpointing process effectively.
  5. Custom Serializers: If you’ve implemented custom TypeSerializers for your state, any changes in their logic or the underlying data structures they serialize will break savepoint restoration.

    • Diagnosis: Errors specific to your custom serializer, such as ClassCastException or deserialization failures within your custom logic.
    • Fix: Update your custom serializer to be compatible with the Flink version’s serialization framework. If the state structure it manages has changed, you might need to implement a migration path within the serializer itself or use --allowNonRestoredState if the custom state can be reset.
    • Why it works: Custom serializers are responsible for the byte-level representation of your state. If the serializer’s internal logic or the data it operates on changes, the old serialized bytes will become unreadable.
  6. Operator Refactoring/Replacement: If you replaced an operator with a different one (even if it seems functionally equivalent), Flink won’t automatically map the state.

    • Diagnosis: Similar to operator ID changes, Flink won’t find state for the new operator, or it might find state for an operator that no longer exists.
    • Fix: The most robust solution is to use uid() to preserve the operator ID across refactoring if the state is meant to be preserved. If you are genuinely replacing an operator and want to discard its state, use --allowNonRestoredState. If you need to migrate state from an old operator type to a new one, you’ll need a custom migration logic, often involving a StateMigrationOperator or a two-step migration process.
    • Why it works: Flink’s state management is tied to operator UIDs and their associated state descriptors. Replacing an operator without preserving the UID means Flink sees it as a new operator, and its state is lost unless explicitly migrated or discarded.

The next error you’ll typically hit after successfully migrating is related to the new features or changes in Flink 1.18 that your job code might not yet be leveraging, or perhaps subtle timing differences in event processing.

Want structured learning?

Take the full Flink course →