The Flink JobManager failed to serialize the ExecutionConfig object, preventing it from distributing job execution details to the TaskManagers.
This usually happens because the ExecutionConfig itself, or objects referenced within it, contain non-serializable data. Flink relies on Java’s Serializable interface for distributed communication, and any object that doesn’t implement it will cause this failure.
Here are the common culprits and how to fix them:
-
Non-Serializable Objects in
ExecutionConfig.setGlobalJobParameters(): You’ve likely set custom job parameters usingsetGlobalJobParameters(), and one or more of these parameters are not serializable. This is the most frequent cause.- Diagnosis: Inspect the
org.apache.flink.api.java.ExecutionEnvironment.getExecutionConfig().getGlobalJobParameters()object and its contained keys/values. Look for any custom classes that don’t extendjava.io.Serializable. - Fix: Ensure all objects passed to
setGlobalJobParameters()implementjava.io.Serializable. If you’re passing aConfigurationobject, ensure its contents are serializable. Often, replacing a custom non-serializable object with a simpleString,Integer, or aHashMap<String, String>containing serializable data resolves this.// Incorrect: // MyCustomNonSerializableParams params = new MyCustomNonSerializableParams(); // env.getConfig().setGlobalJobParameters(params); // Correct: ParameterTool params = ParameterTool.fromMap( Collections.singletonMap("my.serializable.param", "some_value") ); env.getConfig().setGlobalJobParameters(params); - Why it works:
ParameterToolis designed to hold serializable key-value pairs, and by converting your custom parameters into a serializable format (like aMapof strings), Flink can safely serialize and transmit them.
- Diagnosis: Inspect the
-
Non-Serializable Objects in
ExecutionConfig.addDefaultKryoSerializer()oraddDefaultField(): You’ve registered custom Kryo serializers or field serializers with theExecutionConfig, and the serializer class itself, or the types it’s meant to handle, are not serializable.- Diagnosis: Examine the output of
env.getConfig().getKryoCustomizers()andenv.getConfig().getAutoTypeRegistrationConfig(). Look for any custom serializer classes or types specified that do not implementjava.io.Serializable. - Fix: Ensure any custom Kryo serializer classes you register implement
java.io.Serializable. Alternatively, use Flink’s built-in serializers or standard Java serialization if possible.// Incorrect: // env.getConfig().addDefaultKryoSerializer(MyNonSerializableType.class, MyNonSerializableSerializer.class); // Correct: env.getConfig().addDefaultKryoSerializer(MySerializableType.class, MySerializableSerializer.class); // Or if MyNonSerializableSerializer is not serializable, consider if it's truly needed. - Why it works: Flink serializes the
ExecutionConfigto send it to TaskManagers. If the serializer class itself isn’t serializable, Flink cannot send the instructions on how to serialize/deserialize your custom types.
- Diagnosis: Examine the output of
-
Non-Serializable Objects in
ExecutionConfig.setParallelism()orsetParallelismForType(): While less common, you might be passing non-serializable objects as arguments to these methods if they are dynamically constructed.- Diagnosis: Review the code where
setParallelism()orsetParallelismForType()are called. Ensure any variables or objects used as arguments are serializable. - Fix: Ensure all arguments passed to these methods are standard Java primitives, Strings, or other serializable types.
// Potentially problematic if 'someDynamicObject' is not serializable // env.getConfig().setParallelismForType(MyType.class, someDynamicObject.getParallelism()); // Safer: int staticParallelism = 4; env.getConfig().setParallelismForType(MyType.class, staticParallelism); - Why it works: These methods expect simple integer values for parallelism. Passing complex, non-serializable objects defeats the purpose of serialization.
- Diagnosis: Review the code where
-
Non-Serializable
StreamExecutionEnvironmentorBatchExecutionEnvironmentInstance: In some advanced scenarios, you might be trying to serialize the environment itself, which is not intended.- Diagnosis: Look for code that attempts to serialize
envorexecutionEnvironmentobjects directly, perhaps throughObjectOutputStream. - Fix: Do not serialize the
ExecutionEnvironmentinstance. Instead, serialize only theExecutionConfigif needed, or more commonly, reconstruct the environment on the receiving side and then get its config. - Why it works: The
ExecutionEnvironmentcontains references to runtime components and is not designed for serialization.
- Diagnosis: Look for code that attempts to serialize
-
Custom
TypeSerializerFactoryorTypeInformationNot Serializable: If you’re implementing customTypeInformationorTypeSerializerFactory, they must be serializable themselves.- Diagnosis: Inspect your custom
TypeInformationandTypeSerializerFactoryimplementations. Check if they implementjava.io.Serializable. - Fix: Ensure your custom
TypeInformationandTypeSerializerFactoryclasses implementjava.io.Serializable.// Incorrect: // public class MyCustomTypeInfo extends TypeInformation<MyType> { ... } // Correct: public class MyCustomTypeInfo extends TypeInformation<MyType> implements java.io.Serializable { ... } - Why it works: Flink needs to serialize these definitions to distribute them to TaskManagers so they know how to handle your custom types.
- Diagnosis: Inspect your custom
-
Non-Serializable
ExecutionConfig.enableClosureCleaner(): If you’ve explicitly enabled the closure cleaner and your UDFs or other closures contain non-serializable fields, this can manifest as a serialization error.- Diagnosis: Review your UDFs (e.g.,
MapFunction,ProcessFunction) and any other lambda expressions or anonymous classes used. Check if they hold references to non-serializable objects. - Fix: Ensure all fields within your closures are serializable. If a field is non-serializable, consider making it transient if it’s not needed for serialization, or redesign the closure to not hold that reference.
// Potentially problematic: // class MyMapper extends RichMapFunction<Input, Output> { // private transient NonSerializableObject helper; // If helper is used after init, this is fine // private NonSerializableObject problem; // If problem is serialized, this will fail // @Override // public Output map(Input value) throws Exception { // // ... use problem ... // } // } // Correct approach: Ensure all fields are serializable or transient and not used in serialization-sensitive ways. - Why it works: The closure cleaner serializes the closure. If non-serializable fields are present and required for the closure’s operation, serialization will fail.
- Diagnosis: Review your UDFs (e.g.,
After fixing these, you might encounter java.lang.ClassNotFoundException if the TaskManagers cannot find the custom classes you’ve registered (e.g., custom serializers or types) because they are not available on the TaskManager’s classpath.