The Flink JobManager has determined that your job cannot be scheduled and run, usually because a critical component required for its operation is missing or misconfigured.

Insufficient Task Slots

Diagnosis: Check the Flink UI (/taskslots) for the number of available and allocated task slots. Look for a high number of "Free" slots being zero or very close to zero, and "Allocated" slots equaling or exceeding the total.

Cause: The JobManager doesn’t have enough available task slots on the TaskManagers to accommodate all the tasks in your job. Flink distributes job parallelism across these slots.

Fix:

  1. Increase Task Manager Capacity: Modify flink-conf.yaml on your TaskManagers (or via your deployment configuration) to increase taskmanager.memory.process.size and taskmanager.numberOfTaskSlots. Restart your TaskManagers.
    taskmanager.memory.process.size: 4096m
    taskmanager.numberOfTaskSlots: 4
    
    This allocates more memory to each TaskManager and gives each one more slots for tasks, allowing your job to be scheduled.
  2. Scale Out Task Managers: If you have multiple TaskManagers, add more instances to your cluster. This distributes the slots across more machines.

Why it works: More task slots directly translate to more capacity for Flink to deploy your job’s tasks.

Incorrect Parallelism Configuration

Diagnosis: Examine your Flink job’s submission configuration or code for the parallelism.default setting. Compare this to the number of available task slots in your cluster.

Cause: The job’s declared parallelism (how many parallel instances of each operator it wants) is higher than the available task slots in the cluster, or the cluster is configured with a very low default parallelism that doesn’t match the job’s needs.

Fix:

  1. Adjust Job Parallelism: When submitting the job, set the desired parallelism:
    ./bin/flink run -p 16 /path/to/your/job.jar
    
    Replace 16 with the appropriate number of parallel instances.
  2. Adjust Cluster Default Parallelism: In flink-conf.yaml, set parallelism.default to a reasonable value for your cluster.
    parallelism.default: 8
    
    This ensures new jobs pick up a sensible default parallelism if not explicitly set.

Why it works: Flink needs to map each of your job’s parallel tasks to a task slot. If you ask for 100 tasks but only have 50 slots, it can’t run. This setting aligns the job’s requirements with the cluster’s capacity.

Missing or Incorrect JAR Files

Diagnosis: Check the Flink UI’s "Jobs" view. Hover over the job name or click on it. Look for errors related to class loading or missing dependencies.

Cause: The Flink JobManager or TaskManagers cannot find the JAR file containing your job code, or it’s corrupted, or it’s missing crucial dependencies that were expected to be bundled.

Fix:

  1. Verify JAR Path: Ensure the JAR file path provided during job submission (./bin/flink run /path/to/your/job.jar) is correct and accessible by the JobManager.
  2. Bundle Dependencies: If your job relies on external libraries, ensure they are included in the fat JAR or are available in the Flink classpath. Use a build tool like Maven or Gradle with the appropriate plugins (e.g., maven-shade-plugin) to create an executable JAR with all dependencies.
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
  3. Check Flink Classpath: For libraries not bundled, ensure they are placed in Flink’s lib directory or specified via FLINK_CLASSPATH.

Why it works: Flink needs the actual code to execute. If it can’t load the job’s classes or its dependencies, it cannot instantiate and run the job.

Network Connectivity Issues

Diagnosis: Check Flink logs for IOException, ConnectException, or messages indicating failure to reach TaskManagers from the JobManager, or vice-versa. Use ping or telnet from the JobManager host to TaskManager hosts on the Flink ports (e.g., telnet <taskmanager-ip> 6121).

Cause: The JobManager cannot communicate with the TaskManagers (or vice-versa) on the required Flink ports (JobManager: 6123, TaskManager: 6121 by default). This can be due to firewalls, incorrect IP configurations, or network routing problems.

Fix:

  1. Open Firewall Ports: Ensure ports 6121 (TaskManager RPC), 6123 (JobManager RPC), and potentially 6122 (Blob Server) are open between the JobManager and all TaskManagers.
    # Example using iptables on Linux
    sudo iptables -A INPUT -p tcp --dport 6121 -j ACCEPT
    sudo iptables -A INPUT -p tcp --dport 6123 -j ACCEPT
    sudo iptables -A INPUT -p tcp --dport 6122 -j ACCEPT
    
  2. Verify IP Addresses: In flink-conf.yaml (for standalone), ensure jobmanager.rpc.address and taskmanager.host (if explicitly set) point to the correct, resolvable IP addresses or hostnames.
  3. Check DNS Resolution: Ensure all nodes can resolve each other’s hostnames if you’re using hostnames instead of IPs.

Why it works: Flink’s distributed architecture relies on constant communication. If the JobManager can’t instruct TaskManagers to start tasks, or if TaskManagers can’t report back, the job remains unscheduled.

Diagnosis: Check the Flink version you’re running the job against (e.g., flink-conf.yaml or the version of the flink-shell used for submission) and compare it to the Flink version of your cluster. Look for messages like "Incompatible class version" or "UnsupportedOperationException" in logs.

Cause: You are trying to run a job compiled with one Flink version against a Flink cluster running a different, incompatible version. This can lead to subtle classloading issues or API incompatibilities.

Fix:

  1. Align Versions: Ensure your job JAR is compiled against the exact same Flink version as your cluster is running.
    • Build Tool: Update the Flink dependency in your pom.xml or build.gradle:
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
          <version>1.15.4</version> <!-- Use your cluster's version -->
      </dependency>
      
    • Submission: Use the flink executable that matches your cluster version for submitting jobs.

Why it works: Flink’s internal APIs and data structures evolve. Using a JAR compiled for one version with a cluster of another can cause these components to fail to interact correctly.

Resource Allocation (Memory/CPU) Issues

Diagnosis: Examine the Flink UI’s "Task Slots" and "Task Managers" pages. Look for TaskManagers that are consistently showing high memory or CPU utilization or are frequently restarting. Check system-level metrics on the TaskManager hosts.

Cause: TaskManagers do not have sufficient memory or CPU resources allocated to them by the underlying operating system or container orchestrator (like Kubernetes). This causes Flink’s JVMs to struggle, leading to garbage collection pauses, OutOfMemory errors (even if not explicitly shown), or simply making the TaskManager unresponsive.

Fix:

  1. Increase JVM Heap: In flink-conf.yaml, adjust taskmanager.memory.heap.size.
    taskmanager.memory.heap.size: 2048m
    
    This directly increases the Java heap available to the TaskManager process.
  2. Adjust Managed Memory: If using managed memory, ensure taskmanager.memory.managed.size is adequate.
  3. Container/VM Resources: If running in containers (e.g., Docker, Kubernetes), increase the CPU and memory limits assigned to the TaskManager pods/containers.
  4. System Resources: On bare metal, ensure the host machine has enough free RAM and CPU capacity.

Why it works: Flink tasks, especially stateful ones or those with high throughput, consume significant memory and CPU. If the TaskManager process itself is starved for resources, it cannot reliably start or run tasks, leading to the "not runnable" state.

The next error you’ll likely encounter if these are fixed is a CheckpointException if your state backend is misconfigured or unreachable.

Want structured learning?

Take the full Flink course →