The most surprising truth about schema evolution in Flink is that it’s less about the serialization format (Avro or Protobuf) and more about the order in which you apply changes and how Flink’s internal state management handles those changes.
Let’s see this in action. Imagine a simple Flink streaming job that reads user events, enriches them, and writes them to a sink.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.protobuf.ProtobufDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.util.Properties;
public class SchemaEvolutionExample {
// Dummy Avro User class (generated by Avro compiler)
public static class UserAvro {
private String name;
private int age;
// ... getters and setters
}
// Dummy Protobuf User class (generated by Protobuf compiler)
public static class UserProtobuf {
private String name;
private int age;
// ... getters and setters
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "schema-evolution-group");
// --- Scenario 1: Starting with Avro ---
System.out.println("--- Starting with Avro ---");
DeserializationSchema<UserAvro> avroDeserializer = AvroDeserializationSchema.forSpecificRecord(UserAvro.class);
SerializationSchema<UserAvro> avroSerializer = AvroDeserializationSchema.forSpecificRecord(UserAvro.class);
FlinkKafkaConsumer<UserAvro> kafkaConsumerAvro = new FlinkKafkaConsumer<>("user-events-avro", avroDeserializer, kafkaProps);
FlinkKafkaProducer<UserAvro> kafkaProducerAvro = new FlinkKafkaProducer<>("enriched-events-avro", avroSerializer, kafkaProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
DataStream<UserAvro> userStreamAvro = env.addSource(kafkaConsumerAvro);
// Dummy enrichment logic
DataStream<UserAvro> enrichedStreamAvro = userStreamAvro.map(user -> {
// Add some enrichment logic here
return user; // For simplicity, returning the same user
});
enrichedStreamAvro.addSink(kafkaProducerAvro);
// env.execute("Avro Schema Evolution Job"); // Don't execute yet
// --- Scenario 2: Evolving to Protobuf (conceptually) ---
// In a real scenario, you'd stop the old job, deploy a new one,
// and have a compatibility layer or strategy for the Kafka topic.
// This code demonstrates the *transition* conceptually.
System.out.println("--- Evolving to Protobuf ---");
DeserializationSchema<UserProtobuf> protobufDeserializer = ProtobufDeserializationSchema.forProtobufClass(UserProtobuf.class);
SerializationSchema<UserProtobuf> protobufSerializer = ProtobufDeserializationSchema.forProtobufClass(UserProtobuf.class);
FlinkKafkaConsumer<UserProtobuf> kafkaConsumerProtobuf = new FlinkKafkaConsumer<>("user-events-protobuf", protobufDeserializer, kafkaProps);
FlinkKafkaProducer<UserProtobuf> kafkaProducerProtobuf = new FlinkKafkaProducer<>("enriched-events-protobuf", protobufSerializer, kafkaProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
DataStream<UserProtobuf> userStreamProtobuf = env.addSource(kafkaConsumerProtobuf);
// Dummy enrichment logic for Protobuf
DataStream<UserProtobuf> enrichedStreamProtobuf = userStreamProtobuf.map(user -> {
// Add some enrichment logic here
return user; // For simplicity, returning the same user
});
enrichedStreamProtobuf.addSink(kafkaProducerProtobuf);
env.execute("Schema Evolution Transition Job");
}
}
The core problem Flink aims to solve with schema evolution is maintaining state across job restarts and upgrades when the data schema changes. Flink’s state is often serialized using the same mechanism as the data it processes. If your state serialization and data serialization schemas diverge in incompatible ways, Flink can’t deserialize its own state, leading to job failures.
How Flink Manages State and Schema Evolution
Flink’s state backends (like HashMapStateBackend, RocksDBStateBackend) are responsible for storing and retrieving operator state. When you update your Flink job, especially if it involves changing data types or serialization formats, Flink needs to be able to load the old state using the new job code. This is where schema evolution becomes critical.
The key is backward and forward compatibility.
- Backward Compatibility: The new job code must be able to read data written by the old job code.
- Forward Compatibility: The old job code must be able to read data written by the new job code.
For schema evolution, you typically want your new job to be backward compatible with the old data, and your old job to be forward compatible with the new data (during a transition period).
Avro vs. Protobuf for Schema Evolution
Both Avro and Protobuf are designed with schema evolution in mind.
- Avro: Uses a schema (
.avscfiles) to define data. It has strong support for schema evolution rules (adding, removing, reordering fields) as long as the schema resolution rules are followed. Flink’sAvroDeserializationSchemaandAvroSerializationSchemaleverage this. - Protobuf: Uses
.protofiles. Fields are identified by unique numbers (tags) rather than names. This makes it inherently backward and forward compatible for many common evolution scenarios (adding optional fields, reordering fields if tags are preserved). Flink’sProtobufDeserializationSchemaandProtobufSerializationSchemahandle this.
Common Pitfalls and How to Handle Them
-
Incompatible Schema Changes:
- Diagnosis: When you deploy a new job version and it fails with a deserialization error, often mentioning a missing field or an unexpected type.
- Cause: Renaming a field, changing a primitive type (e.g.,
inttostring), or removing a required field without proper handling. - Fix (Avro): Ensure
nameortypeof fields match. If you need to rename a field, use Avro’saliasesin the schema. If you change a type, ensure it’s a compatible change (e.g.,inttolong). If removing a field, make it nullable or provide a default value.- Example Avro schema change (adding a nullable field):
{ "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"}, {"name": "city", "type": ["null", "string"], "default": null} // Added field ] } - Why it works: Avro’s schema resolution mechanism allows the reader’s schema to be different from the writer’s schema, as long as they are compatible according to Avro’s rules. The
default: nullensures that older data (withoutcity) can be read by the new schema, and newer data (withcity) can be read by older schemas ifcityis ignored (as it’s optional).
- Example Avro schema change (adding a nullable field):
- Fix (Protobuf): Never reuse field tags. Always add new fields as
optionalorrepeated. If you must change a field’s type, ensure it’s a compatible change and consider the implications for existing data. Removing fields is generally discouraged; mark them as deprecated and stop using them.- Example Protobuf
.protochange:message User { required string name = 1; required int32 age = 2; optional string city = 3; // Added field } - Why it works: Protobuf uses field tags for identification. The new field
citywith tag3won’t interfere with existing fields. Older clients will simply ignore thecityfield if they don’t know about it.
- Example Protobuf
-
State Serialization Mismatch:
- Diagnosis: Job fails on startup with
Cannot find state descriptororState deserialization error. - Cause: Your job’s state (e.g., keyed state, operator state) is serialized using an older schema, but the job code expects a newer schema. This often happens when you change the type of a field that is part of your state key or value.
- Fix (General): Flink’s state is tied to the
TypeInformationof the operator. If your operator’sTypeInformationchanges due to schema evolution (e.g., fromUserAvrotoUserProtobuf), Flink will attempt to migrate state if it can. However, for major format changes or incompatible type changes, you might need to manually clear or reset state.- Manual State Reset: Before deploying the new job, either stop the Flink job and delete the state backend (e.g., delete the RocksDB directory or the HDFS path) or use Flink’s state migration tools if applicable.
- Why it works: By clearing the state, the new job starts with a clean slate, avoiding the deserialization issue. This is a last resort, as it means losing all previous state.
- Fix (Flink 1.11+ with
ChangelogStateBackend): Flink has improved state evolution capabilities. When using compatible schema changes, Flink can often automatically migrate state. Ensure yourTypeInformationis correctly inferred or explicitly set.- Why it works: Flink’s state backend can leverage the schema evolution capabilities of the underlying serialization formats to deserialize old state into new types.
- Diagnosis: Job fails on startup with
-
Kafka Consumer/Producer Configuration:
- Diagnosis: Data isn’t being read or written correctly after a schema change, or errors related to Kafka topics.
- Cause: The Kafka topic schema used by the producer in the old job is different from what the consumer in the new job expects, or vice-versa.
- Fix (Avro): Ensure both the consumer and producer are configured to use compatible Avro schemas. If you’re using Flink’s Avro format, it typically infers the schema from the data or requires an explicit schema. If using Schema Registry, ensure the topic is associated with the correct schema versions.
- Example: If the producer writes Avro with schema v2 and the consumer expects v1, and v2 is backward compatible with v1, it works. If the consumer expects v2 and the producer writes v1, it will fail unless v1 is forward compatible with v2.
- Why it works: Kafka acts as the durable buffer. Flink’s connectors read from and write to Kafka. The compatibility must hold between the writer’s schema and the reader’s schema for each hop.
- Fix (Protobuf): Similar to Avro, ensure consistent use of Protobuf schemas. Protobuf’s tag-based system offers strong compatibility. The main concern is ensuring the correct
.protofile is used to generate the Java classes for both serialization and deserialization.- Why it works: Protobuf’s field tags ensure that data can be parsed even if the reader doesn’t have all the fields defined in the writer’s schema, as long as the tags are consistent.
-
Flink’s Internal Serialization:
- Diagnosis: Complex Flink jobs with custom serializers or user-defined types might fail if their internal serialization logic doesn’t account for schema evolution.
- Cause: Changes in custom
TypeInformationimplementations or customSerializerimplementations that don’t handle versioning. - Fix: If you use custom serializers, they must implement their own versioning logic or ensure they are compatible with Flink’s state evolution mechanisms. Often, it’s best to stick to Flink’s built-in serializers (like
AvroSerializationorProtobufSerialization) for data managed by Flink.- Why it works: Flink’s core serializers are designed to work with Flink’s state management and checkpointing. Custom solutions introduce complexity and potential points of failure if not carefully designed for evolution.
-
Order of Deployment:
- Diagnosis: During a migration, you might see errors if consumers and producers are upgraded at different times, leading to a period of incompatibility.
- Cause: The producer is updated to a new schema, but the consumer is still running the old code, or vice-versa.
- Fix: Plan your deployment carefully. A common strategy is:
- Deploy a new producer version that writes data compatible with the old consumer’s schema (e.g., add nullable fields).
- Deploy a new consumer version that can read data from the old producer’s schema and the new producer’s schema (e.g., using schema registry for lookup).
- Once both producer and consumer are updated and running stably, deploy a new producer that uses the new schema fully.
- Finally, deploy a new consumer that expects the new schema.
- Why it works: This phased approach ensures that at no point is data written by one version unreadable by the other. It requires careful coordination and often the use of a schema registry.
The one thing most people don’t realize is that Flink’s state backend is deeply intertwined with the data types and their serializers. When you change a data type that is part of your keyed state, Flink doesn’t just update the type definition; it must be able to deserialize the old bytes representing the old type into the new type. This is precisely why schema evolution on the data types themselves is critical, and why Flink’s state migration capabilities are so powerful when compatible changes are made to your Avro or Protobuf schemas. If you simply change a field name in your Java object but not in the underlying Avro/Protobuf schema definition, Flink won’t automatically know to map them.
The next problem you’ll likely encounter is managing schema versions across multiple Flink jobs that might interact, or dealing with schema evolution in external systems like databases or data lakes that Flink writes to.