The most surprising truth about schema evolution in Flink is that it’s less about the serialization format (Avro or Protobuf) and more about the order in which you apply changes and how Flink’s internal state management handles those changes.

Let’s see this in action. Imagine a simple Flink streaming job that reads user events, enriches them, and writes them to a sink.

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.protobuf.ProtobufDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Properties;

public class SchemaEvolutionExample {

    // Dummy Avro User class (generated by Avro compiler)
    public static class UserAvro {
        private String name;
        private int age;
        // ... getters and setters
    }

    // Dummy Protobuf User class (generated by Protobuf compiler)
    public static class UserProtobuf {
        private String name;
        private int age;
        // ... getters and setters
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "schema-evolution-group");

        // --- Scenario 1: Starting with Avro ---
        System.out.println("--- Starting with Avro ---");
        DeserializationSchema<UserAvro> avroDeserializer = AvroDeserializationSchema.forSpecificRecord(UserAvro.class);
        SerializationSchema<UserAvro> avroSerializer = AvroDeserializationSchema.forSpecificRecord(UserAvro.class);

        FlinkKafkaConsumer<UserAvro> kafkaConsumerAvro = new FlinkKafkaConsumer<>("user-events-avro", avroDeserializer, kafkaProps);
        FlinkKafkaProducer<UserAvro> kafkaProducerAvro = new FlinkKafkaProducer<>("enriched-events-avro", avroSerializer, kafkaProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        DataStream<UserAvro> userStreamAvro = env.addSource(kafkaConsumerAvro);

        // Dummy enrichment logic
        DataStream<UserAvro> enrichedStreamAvro = userStreamAvro.map(user -> {
            // Add some enrichment logic here
            return user; // For simplicity, returning the same user
        });

        enrichedStreamAvro.addSink(kafkaProducerAvro);
        // env.execute("Avro Schema Evolution Job"); // Don't execute yet

        // --- Scenario 2: Evolving to Protobuf (conceptually) ---
        // In a real scenario, you'd stop the old job, deploy a new one,
        // and have a compatibility layer or strategy for the Kafka topic.
        // This code demonstrates the *transition* conceptually.

        System.out.println("--- Evolving to Protobuf ---");
        DeserializationSchema<UserProtobuf> protobufDeserializer = ProtobufDeserializationSchema.forProtobufClass(UserProtobuf.class);
        SerializationSchema<UserProtobuf> protobufSerializer = ProtobufDeserializationSchema.forProtobufClass(UserProtobuf.class);

        FlinkKafkaConsumer<UserProtobuf> kafkaConsumerProtobuf = new FlinkKafkaConsumer<>("user-events-protobuf", protobufDeserializer, kafkaProps);
        FlinkKafkaProducer<UserProtobuf> kafkaProducerProtobuf = new FlinkKafkaProducer<>("enriched-events-protobuf", protobufSerializer, kafkaProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        DataStream<UserProtobuf> userStreamProtobuf = env.addSource(kafkaConsumerProtobuf);

        // Dummy enrichment logic for Protobuf
        DataStream<UserProtobuf> enrichedStreamProtobuf = userStreamProtobuf.map(user -> {
            // Add some enrichment logic here
            return user; // For simplicity, returning the same user
        });

        enrichedStreamProtobuf.addSink(kafkaProducerProtobuf);

        env.execute("Schema Evolution Transition Job");
    }
}

The core problem Flink aims to solve with schema evolution is maintaining state across job restarts and upgrades when the data schema changes. Flink’s state is often serialized using the same mechanism as the data it processes. If your state serialization and data serialization schemas diverge in incompatible ways, Flink can’t deserialize its own state, leading to job failures.

Flink’s state backends (like HashMapStateBackend, RocksDBStateBackend) are responsible for storing and retrieving operator state. When you update your Flink job, especially if it involves changing data types or serialization formats, Flink needs to be able to load the old state using the new job code. This is where schema evolution becomes critical.

The key is backward and forward compatibility.

  • Backward Compatibility: The new job code must be able to read data written by the old job code.
  • Forward Compatibility: The old job code must be able to read data written by the new job code.

For schema evolution, you typically want your new job to be backward compatible with the old data, and your old job to be forward compatible with the new data (during a transition period).

Avro vs. Protobuf for Schema Evolution

Both Avro and Protobuf are designed with schema evolution in mind.

  • Avro: Uses a schema (.avsc files) to define data. It has strong support for schema evolution rules (adding, removing, reordering fields) as long as the schema resolution rules are followed. Flink’s AvroDeserializationSchema and AvroSerializationSchema leverage this.
  • Protobuf: Uses .proto files. Fields are identified by unique numbers (tags) rather than names. This makes it inherently backward and forward compatible for many common evolution scenarios (adding optional fields, reordering fields if tags are preserved). Flink’s ProtobufDeserializationSchema and ProtobufSerializationSchema handle this.

Common Pitfalls and How to Handle Them

  1. Incompatible Schema Changes:

    • Diagnosis: When you deploy a new job version and it fails with a deserialization error, often mentioning a missing field or an unexpected type.
    • Cause: Renaming a field, changing a primitive type (e.g., int to string), or removing a required field without proper handling.
    • Fix (Avro): Ensure name or type of fields match. If you need to rename a field, use Avro’s aliases in the schema. If you change a type, ensure it’s a compatible change (e.g., int to long). If removing a field, make it nullable or provide a default value.
      • Example Avro schema change (adding a nullable field):
        {
          "type": "record",
          "name": "User",
          "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
            {"name": "city", "type": ["null", "string"], "default": null} // Added field
          ]
        }
        
      • Why it works: Avro’s schema resolution mechanism allows the reader’s schema to be different from the writer’s schema, as long as they are compatible according to Avro’s rules. The default: null ensures that older data (without city) can be read by the new schema, and newer data (with city) can be read by older schemas if city is ignored (as it’s optional).
    • Fix (Protobuf): Never reuse field tags. Always add new fields as optional or repeated. If you must change a field’s type, ensure it’s a compatible change and consider the implications for existing data. Removing fields is generally discouraged; mark them as deprecated and stop using them.
      • Example Protobuf .proto change:
        message User {
          required string name = 1;
          required int32 age = 2;
          optional string city = 3; // Added field
        }
        
      • Why it works: Protobuf uses field tags for identification. The new field city with tag 3 won’t interfere with existing fields. Older clients will simply ignore the city field if they don’t know about it.
  2. State Serialization Mismatch:

    • Diagnosis: Job fails on startup with Cannot find state descriptor or State deserialization error.
    • Cause: Your job’s state (e.g., keyed state, operator state) is serialized using an older schema, but the job code expects a newer schema. This often happens when you change the type of a field that is part of your state key or value.
    • Fix (General): Flink’s state is tied to the TypeInformation of the operator. If your operator’s TypeInformation changes due to schema evolution (e.g., from UserAvro to UserProtobuf), Flink will attempt to migrate state if it can. However, for major format changes or incompatible type changes, you might need to manually clear or reset state.
      • Manual State Reset: Before deploying the new job, either stop the Flink job and delete the state backend (e.g., delete the RocksDB directory or the HDFS path) or use Flink’s state migration tools if applicable.
      • Why it works: By clearing the state, the new job starts with a clean slate, avoiding the deserialization issue. This is a last resort, as it means losing all previous state.
    • Fix (Flink 1.11+ with ChangelogStateBackend): Flink has improved state evolution capabilities. When using compatible schema changes, Flink can often automatically migrate state. Ensure your TypeInformation is correctly inferred or explicitly set.
      • Why it works: Flink’s state backend can leverage the schema evolution capabilities of the underlying serialization formats to deserialize old state into new types.
  3. Kafka Consumer/Producer Configuration:

    • Diagnosis: Data isn’t being read or written correctly after a schema change, or errors related to Kafka topics.
    • Cause: The Kafka topic schema used by the producer in the old job is different from what the consumer in the new job expects, or vice-versa.
    • Fix (Avro): Ensure both the consumer and producer are configured to use compatible Avro schemas. If you’re using Flink’s Avro format, it typically infers the schema from the data or requires an explicit schema. If using Schema Registry, ensure the topic is associated with the correct schema versions.
      • Example: If the producer writes Avro with schema v2 and the consumer expects v1, and v2 is backward compatible with v1, it works. If the consumer expects v2 and the producer writes v1, it will fail unless v1 is forward compatible with v2.
      • Why it works: Kafka acts as the durable buffer. Flink’s connectors read from and write to Kafka. The compatibility must hold between the writer’s schema and the reader’s schema for each hop.
    • Fix (Protobuf): Similar to Avro, ensure consistent use of Protobuf schemas. Protobuf’s tag-based system offers strong compatibility. The main concern is ensuring the correct .proto file is used to generate the Java classes for both serialization and deserialization.
      • Why it works: Protobuf’s field tags ensure that data can be parsed even if the reader doesn’t have all the fields defined in the writer’s schema, as long as the tags are consistent.
  4. Flink’s Internal Serialization:

    • Diagnosis: Complex Flink jobs with custom serializers or user-defined types might fail if their internal serialization logic doesn’t account for schema evolution.
    • Cause: Changes in custom TypeInformation implementations or custom Serializer implementations that don’t handle versioning.
    • Fix: If you use custom serializers, they must implement their own versioning logic or ensure they are compatible with Flink’s state evolution mechanisms. Often, it’s best to stick to Flink’s built-in serializers (like AvroSerialization or ProtobufSerialization) for data managed by Flink.
      • Why it works: Flink’s core serializers are designed to work with Flink’s state management and checkpointing. Custom solutions introduce complexity and potential points of failure if not carefully designed for evolution.
  5. Order of Deployment:

    • Diagnosis: During a migration, you might see errors if consumers and producers are upgraded at different times, leading to a period of incompatibility.
    • Cause: The producer is updated to a new schema, but the consumer is still running the old code, or vice-versa.
    • Fix: Plan your deployment carefully. A common strategy is:
      1. Deploy a new producer version that writes data compatible with the old consumer’s schema (e.g., add nullable fields).
      2. Deploy a new consumer version that can read data from the old producer’s schema and the new producer’s schema (e.g., using schema registry for lookup).
      3. Once both producer and consumer are updated and running stably, deploy a new producer that uses the new schema fully.
      4. Finally, deploy a new consumer that expects the new schema.
      • Why it works: This phased approach ensures that at no point is data written by one version unreadable by the other. It requires careful coordination and often the use of a schema registry.

The one thing most people don’t realize is that Flink’s state backend is deeply intertwined with the data types and their serializers. When you change a data type that is part of your keyed state, Flink doesn’t just update the type definition; it must be able to deserialize the old bytes representing the old type into the new type. This is precisely why schema evolution on the data types themselves is critical, and why Flink’s state migration capabilities are so powerful when compatible changes are made to your Avro or Protobuf schemas. If you simply change a field name in your Java object but not in the underlying Avro/Protobuf schema definition, Flink won’t automatically know to map them.

The next problem you’ll likely encounter is managing schema versions across multiple Flink jobs that might interact, or dealing with schema evolution in external systems like databases or data lakes that Flink writes to.

Want structured learning?

Take the full Flink course →