The Flink JobManager failed to serialize the ExecutionConfig object, preventing it from distributing job execution details to the TaskManagers.

This usually happens because the ExecutionConfig itself, or objects referenced within it, contain non-serializable data. Flink relies on Java’s Serializable interface for distributed communication, and any object that doesn’t implement it will cause this failure.

Here are the common culprits and how to fix them:

  1. Non-Serializable Objects in ExecutionConfig.setGlobalJobParameters(): You’ve likely set custom job parameters using setGlobalJobParameters(), and one or more of these parameters are not serializable. This is the most frequent cause.

    • Diagnosis: Inspect the org.apache.flink.api.java.ExecutionEnvironment.getExecutionConfig().getGlobalJobParameters() object and its contained keys/values. Look for any custom classes that don’t extend java.io.Serializable.
    • Fix: Ensure all objects passed to setGlobalJobParameters() implement java.io.Serializable. If you’re passing a Configuration object, ensure its contents are serializable. Often, replacing a custom non-serializable object with a simple String, Integer, or a HashMap<String, String> containing serializable data resolves this.
      // Incorrect:
      // MyCustomNonSerializableParams params = new MyCustomNonSerializableParams();
      // env.getConfig().setGlobalJobParameters(params);
      
      // Correct:
      ParameterTool params = ParameterTool.fromMap(
          Collections.singletonMap("my.serializable.param", "some_value")
      );
      env.getConfig().setGlobalJobParameters(params);
      
    • Why it works: ParameterTool is designed to hold serializable key-value pairs, and by converting your custom parameters into a serializable format (like a Map of strings), Flink can safely serialize and transmit them.
  2. Non-Serializable Objects in ExecutionConfig.addDefaultKryoSerializer() or addDefaultField(): You’ve registered custom Kryo serializers or field serializers with the ExecutionConfig, and the serializer class itself, or the types it’s meant to handle, are not serializable.

    • Diagnosis: Examine the output of env.getConfig().getKryoCustomizers() and env.getConfig().getAutoTypeRegistrationConfig(). Look for any custom serializer classes or types specified that do not implement java.io.Serializable.
    • Fix: Ensure any custom Kryo serializer classes you register implement java.io.Serializable. Alternatively, use Flink’s built-in serializers or standard Java serialization if possible.
      // Incorrect:
      // env.getConfig().addDefaultKryoSerializer(MyNonSerializableType.class, MyNonSerializableSerializer.class);
      
      // Correct:
      env.getConfig().addDefaultKryoSerializer(MySerializableType.class, MySerializableSerializer.class);
      // Or if MyNonSerializableSerializer is not serializable, consider if it's truly needed.
      
    • Why it works: Flink serializes the ExecutionConfig to send it to TaskManagers. If the serializer class itself isn’t serializable, Flink cannot send the instructions on how to serialize/deserialize your custom types.
  3. Non-Serializable Objects in ExecutionConfig.setParallelism() or setParallelismForType(): While less common, you might be passing non-serializable objects as arguments to these methods if they are dynamically constructed.

    • Diagnosis: Review the code where setParallelism() or setParallelismForType() are called. Ensure any variables or objects used as arguments are serializable.
    • Fix: Ensure all arguments passed to these methods are standard Java primitives, Strings, or other serializable types.
      // Potentially problematic if 'someDynamicObject' is not serializable
      // env.getConfig().setParallelismForType(MyType.class, someDynamicObject.getParallelism());
      
      // Safer:
      int staticParallelism = 4;
      env.getConfig().setParallelismForType(MyType.class, staticParallelism);
      
    • Why it works: These methods expect simple integer values for parallelism. Passing complex, non-serializable objects defeats the purpose of serialization.
  4. Non-Serializable StreamExecutionEnvironment or BatchExecutionEnvironment Instance: In some advanced scenarios, you might be trying to serialize the environment itself, which is not intended.

    • Diagnosis: Look for code that attempts to serialize env or executionEnvironment objects directly, perhaps through ObjectOutputStream.
    • Fix: Do not serialize the ExecutionEnvironment instance. Instead, serialize only the ExecutionConfig if needed, or more commonly, reconstruct the environment on the receiving side and then get its config.
    • Why it works: The ExecutionEnvironment contains references to runtime components and is not designed for serialization.
  5. Custom TypeSerializerFactory or TypeInformation Not Serializable: If you’re implementing custom TypeInformation or TypeSerializerFactory, they must be serializable themselves.

    • Diagnosis: Inspect your custom TypeInformation and TypeSerializerFactory implementations. Check if they implement java.io.Serializable.
    • Fix: Ensure your custom TypeInformation and TypeSerializerFactory classes implement java.io.Serializable.
      // Incorrect:
      // public class MyCustomTypeInfo extends TypeInformation<MyType> { ... }
      
      // Correct:
      public class MyCustomTypeInfo extends TypeInformation<MyType> implements java.io.Serializable { ... }
      
    • Why it works: Flink needs to serialize these definitions to distribute them to TaskManagers so they know how to handle your custom types.
  6. Non-Serializable ExecutionConfig.enableClosureCleaner(): If you’ve explicitly enabled the closure cleaner and your UDFs or other closures contain non-serializable fields, this can manifest as a serialization error.

    • Diagnosis: Review your UDFs (e.g., MapFunction, ProcessFunction) and any other lambda expressions or anonymous classes used. Check if they hold references to non-serializable objects.
    • Fix: Ensure all fields within your closures are serializable. If a field is non-serializable, consider making it transient if it’s not needed for serialization, or redesign the closure to not hold that reference.
      // Potentially problematic:
      // class MyMapper extends RichMapFunction<Input, Output> {
      //     private transient NonSerializableObject helper; // If helper is used after init, this is fine
      //     private NonSerializableObject problem; // If problem is serialized, this will fail
      //     @Override
      //     public Output map(Input value) throws Exception {
      //         // ... use problem ...
      //     }
      // }
      
      // Correct approach: Ensure all fields are serializable or transient and not used in serialization-sensitive ways.
      
    • Why it works: The closure cleaner serializes the closure. If non-serializable fields are present and required for the closure’s operation, serialization will fail.

After fixing these, you might encounter java.lang.ClassNotFoundException if the TaskManagers cannot find the custom classes you’ve registered (e.g., custom serializers or types) because they are not available on the TaskManager’s classpath.

Want structured learning?

Take the full Flink course →