The Flink JobManager failed because it couldn’t allocate the requested managed memory from the TaskManagers, indicating a mismatch between configured memory fractions across the cluster.

The core issue is that Flink’s managed memory, used for efficient data buffering and sorting, is a shared resource. Its allocation is governed by fractions defined in the Flink configuration. When these fractions are inconsistent between the JobManager and TaskManagers, or even between different TaskManagers, the system breaks down. The JobManager, trying to coordinate tasks, requests memory from TaskManagers. If the TaskManagers report having less available managed memory than the JobManager expects based on its own configuration, the JobManager flags this as an error and halts. This often manifests as org.apache.flink.runtime.operators.coordination.OperatorCoordinator$NotEnoughManagedMemoryException.

Here are the common culprits and how to fix them:

  1. Inconsistent taskmanager.memory.managed.fraction across TaskManagers:

    • Diagnosis: On each TaskManager, check its flink-conf.yaml. Look for the taskmanager.memory.managed.fraction setting. If you have multiple TaskManagers, they must have the same value for this setting.
    • Fix: Ensure taskmanager.memory.managed.fraction is identical in the flink-conf.yaml on all TaskManager nodes. For example, set taskmanager.memory.managed.fraction: 0.4 (meaning 40% of the TaskManager’s JVM heap is reserved for managed memory).
    • Why it works: This guarantees that every TaskManager reports its managed memory capacity consistently to the JobManager, preventing the JobManager from being surprised by a TaskManager offering less memory than anticipated.
  2. JobManager configured with a different jobmanager.memory.managed.fraction than TaskManagers:

    • Diagnosis: Check flink-conf.yaml on the JobManager node for jobmanager.memory.managed.fraction and compare it to the taskmanager.memory.managed.fraction on the TaskManagers.
    • Fix: While the JobManager itself can use managed memory, its configuration for it is less critical for this specific error than the TaskManager configurations. The error usually stems from TaskManagers not having enough. However, for good practice and to avoid confusion, it’s best to align them or ensure the JobManager’s fraction is not set if it’s not actively using managed memory for its own operations. If you intend for the JobManager to also use managed memory, set jobmanager.memory.managed.fraction to the same value as on the TaskManagers, e.g., jobmanager.memory.managed.fraction: 0.4.
    • Why it works: This prevents the JobManager from having an internal expectation of managed memory availability that doesn’t match the reality reported by the workers.
  3. Misunderstanding taskmanager.memory.managed.fraction and taskmanager.memory.heap.size:

    • Diagnosis: People often confuse the total JVM heap size with the managed memory portion. The taskmanager.memory.managed.fraction is a percentage of the heap allocated to managed memory. If taskmanager.memory.heap.size is set to 1g and taskmanager.memory.managed.fraction is 0.4, then 0.4 * 1GB = 400MB is available for managed memory.
    • Fix: Re-evaluate your total heap allocation. If you need more managed memory, you must either increase taskmanager.memory.heap.size or increase taskmanager.memory.managed.fraction (within reasonable limits, ensuring enough heap remains for operator state and other JVM overhead). For example, if you need 800MB of managed memory and have taskmanager.memory.heap.size: 1g, you’d set taskmanager.memory.managed.fraction: 0.8.
    • Why it works: Directly adjusts the available pool of memory Flink can use for its internal buffering and sorting operations.
  4. Using Unified Memory (flink.memory.managed.size or flink.memory.managed.fraction) incorrectly with older Flink versions or mixed configurations:

    • Diagnosis: In Flink 1.11 and later, unified memory management is the default. It’s configured via flink.memory.managed.fraction (a fraction of total process memory) or flink.memory.managed.size. Older versions used taskmanager.memory.managed.fraction (fraction of heap). If you’re migrating or have a mixed environment, this can cause confusion.
    • Fix: For Flink 1.11+, stick to flink.memory.managed.fraction or flink.memory.managed.size. If using flink.memory.managed.fraction, it’s a fraction of the total JVM process memory, not just the heap. For example, flink.memory.managed.fraction: 0.6 means 60% of the total process memory is for managed memory. Ensure this is applied consistently across all flink-conf.yaml files. If you’re on an older version, use taskmanager.memory.managed.fraction.
    • Why it works: Aligns the configuration mechanism to the Flink version in use, ensuring memory is accounted for correctly.
  5. External Memory Management Tools Interfering:

    • Diagnosis: If you’re using external tools (like Kubernetes memory limits, Docker memory limits, or custom JVM wrappers) that impose memory caps on the Flink JVM process, these caps might be lower than what Flink expects based on its flink.memory.managed.fraction configuration.
    • Fix: Ensure that any external memory limits set for the Flink JVM process are at least as high as the total memory Flink intends to use (heap + managed memory + JVM overhead). For example, if Flink is configured with a 4GB heap and flink.memory.managed.fraction: 0.6, the total process memory Flink needs is roughly 4GB / (1 - 0.6) = 10GB. Your container or process limit must accommodate this.
    • Why it works: Prevents the operating system or container runtime from killing the Flink process or denying it memory before Flink itself can manage its internal allocations.
  6. Insufficient Total JVM Heap (taskmanager.memory.heap.size or flink.memory.heap.size):

    • Diagnosis: Even if the fraction is correct, if the base heap size is too small, the absolute amount of managed memory available will be insufficient for the job’s needs.
    • Fix: Increase taskmanager.memory.heap.size (or flink.memory.heap.size for unified memory) on all TaskManagers. For instance, change taskmanager.memory.heap.size: 4g to taskmanager.memory.heap.size: 8g.
    • Why it works: Provides a larger pool from which the managed memory fraction can be drawn, increasing the absolute amount of memory available.

After fixing these, you might encounter org.apache.flink.runtime.heartbeat.HeartbeatServices$TimeoutException if the network latency between JobManager and TaskManagers is too high or if TaskManagers are overloaded and can’t respond to heartbeats in time.

Want structured learning?

Take the full Flink course →