Flink’s default serialization, Java’s built-in ObjectOutputStream, is often the bottleneck you’re hitting, and tuning it with Kryo and TypedSerializer is the way to unlock performance.
Let’s see Flink serialize a simple POJO using Kryo, then we’ll dive into why and how.
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.common.serialization.KryoSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class KryoSerializationDemo {
public static class MyPojo {
public String name;
public int value;
public MyPojo() {} // Kryo needs a no-arg constructor
public MyPojo(String name, int value) {
this.name = name;
this.value = value;
}
@Override
public String toString() {
return "MyPojo{" +
"name='" + name + '\'' +
", value=" + value +
'}';
}
}
public static void main(String[] args) throws IOException {
// 1. Get Type Information
TypeInformation<MyPojo> pojoTypeInfo = TypeExtractor.getForClass(MyPojo.class);
PojoTypeInfo<MyPojo> pojoInfo = (PojoTypeInfo<MyPojo>) pojoTypeInfo;
// 2. Configure Kryo Serializer
ExecutionConfig config = new ExecutionConfig();
// Register the POJO class with Kryo
config.registerKryoType(MyPojo.class);
KryoSerializer<MyPojo> kryoSerializer = new KryoSerializer<>(MyPojo.class, config);
// 3. Create and Serialize an object
MyPojo myObject = new MyPojo("TestName", 123);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(baos);
kryoSerializer.serialize(myObject, outputView);
outputView.flush();
byte[] serializedData = baos.toByteArray();
System.out.println("Serialized data length: " + serializedData.length);
// In a real Flink job, this byte array would be sent over the network or written to disk.
// 4. Deserialize (for demonstration)
// In Flink, this happens implicitly on the receiving end.
// We'll simulate it here.
ByteArrayInputStream bais = new ByteArrayInputStream(serializedData);
DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(bais);
MyPojo deserializedObject = kryoSerializer.deserialize(inputView);
System.out.println("Deserialized object: " + deserializedObject);
}
}
This code snippet shows the core of it: getting TypeInformation, configuring KryoSerializer by registering your POJO class, and then using serialize and deserialize. The output Serialized data length is key – it’s typically much smaller than what Java’s default serialization would produce.
The problem Flink solves is distributed state management and data transfer. When you have a distributed system like Flink, you need to send data between nodes (task managers) and save state reliably. The efficiency and correctness of this serialization process directly impact throughput, latency, and memory usage. Java’s built-in ObjectOutputStream is notoriously verbose and slow, especially for complex object graphs or frequent serialization. It writes class names and metadata for every object, leading to large payloads and significant CPU overhead.
Flink’s ExecutionConfig is where you control serialization. By default, it uses JavaSerializer. You can switch to KryoSerializer and, critically, tell Kryo which custom types it should be aware of. Registering your POJOs with config.registerKryoType(MyPojo.class) is the most important step. This allows Kryo to assign a small integer ID to your class, which it then writes instead of the full class name during serialization. For subsequent instances of the same class, only this ID is written, dramatically reducing overhead.
Internally, Kryo uses a combination of techniques. It’s highly configurable and can use different serializers for different types. When you register a type, Kryo often uses a more efficient, custom serializer for it. For POJOs, Kryo can often infer field types and serialize them directly without the overhead of Java serialization’s reflection-heavy approach. It focuses on the data rather than the object’s full class definition on every serialization.
The TypedSerializer concept is Flink’s way of ensuring type safety and efficiency for its internal data structures and operators. While KryoSerializer is a general-purpose serializer, Flink has optimized serializers for its core types (like Tuple, Pojos, primitives, etc.). When you use TypeInformation and Flink’s data types, it often leverages these optimized TypedSerializers under the hood, which can be faster and more memory-efficient than a generic Kryo instance for those specific types. KryoSerializer itself implements the TypeSerializer interface, allowing Flink to use it interchangeably with its built-in serializers.
When tuning, you might encounter situations where Kryo isn’t ideal for certain types. For instance, if you’re serializing java.util.Date or java.sql.Timestamp, Kryo’s default behavior might be inefficient. You can register custom serializers for these types with Kryo itself using Kryo.register(Class<?> type, Serializer<?> serializer, int id). This allows you to provide Flink with a highly optimized, Kryo-native serialization strategy for specific problem types.
The ExecutionConfig is also where you control Flink’s TypeInformation strategies. For POJOs, Flink has a powerful PojoClassLoader and PojoInstantiator that it uses to generate efficient TypeSerializers. When you register a POJO with Kryo, Flink ensures that Kryo uses this efficient POJO serialization strategy when it encounters your registered type. This combination is what makes the tuning so effective.
If you observe that Kryo serialization is still producing larger-than-expected output for certain POJOs, it might be due to the presence of complex nested objects or collections that Kryo isn’t optimally handling by default. You can often improve this by either:
- Registering nested types: If a POJO contains another custom POJO, register that nested POJO with
config.registerKryoType()as well. - Using Kryo’s
addDefaultSerializer: For types that Kryo doesn’t handle well out-of-the-box (e.g., certain JDK types), you can provide a custom Kryo serializer. For example,config.getKryo().addDefaultSerializer(java.util.Date.class, new MyKryoDateSerializer());.
The next thing you’ll likely run into is optimizing Flink’s state backends, which heavily rely on efficient serialization.