Flink’s native Kubernetes integration lets you ditch separate ZooKeeper clusters for HA, but getting it right means understanding how the JobManager high availability truly functions.

Let’s see Flink running on Kubernetes.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: my-flink-app
spec:
  image:
    repository: flinkci/flink
    tag: 1.19.0
    pullPolicy: IfNotPresent
  flinkConfiguration:
    flink.master.num-task-managers: 2
    taskmanager.memory.process.size: 1024m
    jobmanager.memory.process.size: 1024m
    high-availability: kubernetes
    high-availability.storage-dir: file:///opt/flink/ha/
    kubernetes.cluster-id: my-flink-cluster
  serviceAccount:
    name: flink
  jobManager:
    resource:
      memory: "1024Mi"
      cpu: 1
    podTemplate:
      spec:
        containers:
        - name: flink-jobmanager
          resources:
            requests:
              memory: "1024Mi"
              cpu: 1
  taskManager:
    resource:
      memory: "1024Mi"
      cpu: 1
    podTemplate:
      spec:
        containers:
        - name: flink-taskmanager
          resources:
            requests:
              memory: "1024Mi"
              cpu: 1
    replicas: 2
  job:
    jarURI: "local:///opt/flink/examples/streaming/WordCount.jar"
    className: "org.apache.flink.streaming.examples.wordcount.WordCount"
    parallelism: 2
    restartStrategy:
      type: "FAILURE_RATE"
      failureRate: 3
      delayInterval: 60000
      failureWindow: 300000

This FlinkDeployment resource tells Kubernetes to spin up a Flink cluster. The high-availability: kubernetes setting is the key. Instead of relying on external systems like ZooKeeper to elect a leader and store HA state, Flink now uses Kubernetes’ own primitives. When a JobManager pod dies, Kubernetes restarts it. Flink’s HA mechanism leverages this. It uses a shared storage location (defined by high-availability.storage-dir) within the cluster to keep track of leader election and recovery state. A specific Flink JobManager pod will attempt to acquire a lock on this storage. The one that succeeds becomes the leader. If that leader pod fails, another available JobManager pod can then acquire the lock and take over, using the information in the storage directory to recover the job’s state. The kubernetes.cluster-id is crucial for Flink to identify its own HA metadata within Kubernetes.

The core problem Flink HA solves is ensuring that a Flink job continues to run even if the JobManager process or its underlying pod fails. Without HA, a JobManager crash would mean the job stops entirely, and you’d have to manually restart it, likely losing state. Flink’s HA, whether using ZooKeeper or Kubernetes, provides fault tolerance for the control plane. It ensures that there’s always a single, active JobManager responsible for coordinating the TaskManagers and recovering from failures. The HA service runs as a separate component (or in this case, integrated into the JobManager pods themselves with Kubernetes managing the HA state), constantly monitoring the leader.

The high-availability.storage-dir is where Flink stores its HA metadata when using the Kubernetes HA backend. This directory is typically mounted from a PersistentVolumeClaim (PVC) or a shared network filesystem, ensuring that the data persists even if a JobManager pod is rescheduled or replaced. This persistent storage is vital for recovering the JobManager’s state, including the status of running jobs, checkpoints, and savepoints. Without a persistent and accessible storage directory, leader election would be unreliable, and job recovery would be impossible after a failure.

The kubernetes.cluster-id is a unique identifier that Flink uses to isolate its HA metadata within Kubernetes. When Flink starts up, it registers a leader lock with a name derived from this cluster ID. This prevents HA conflicts if you have multiple Flink clusters running within the same Kubernetes namespace. It ensures that only the JobManagers belonging to a specific Flink cluster interact with the correct HA state.

The HA service itself is handled by Flink’s internal HA component. When high-availability: kubernetes is configured, Flink doesn’t spin up a separate HA daemon set or StatefulSet. Instead, each JobManager pod acts as a candidate for the leader. They all attempt to acquire a leader lock using Kubernetes’ underlying mechanisms (like Lease objects, though this is abstracted away by Flink’s Kubernetes HA backend). The JobManager pod that successfully acquires the lock becomes the leader. If the leader pod fails, Kubernetes restarts it. During the restart, the other JobManager pods will detect the leader loss and re-initiate the leader election process. The recovered leader pod will then attempt to re-acquire the lock, and if successful, resume its duties.

When a JobManager fails and is restarted by Kubernetes, it needs to recover the state of the Flink job. This recovery process relies on checkpoints and savepoints that were previously stored. Flink’s HA mechanism ensures that the new JobManager leader can locate and access these stored states from the configured state backend (e.g., S3, HDFS, or a filesystem-based backend). The HA storage directory is used to store information about the current leader and the state of the leader election, not the actual job state itself. The job state is managed separately by the configured state backend.

If your Flink job is stuck in a PENDING state after a JobManager failure, it’s often because the HA metadata in the high-availability.storage-dir is corrupted or inaccessible. This can happen if the underlying persistent storage is not correctly mounted or if there are permission issues. You’ll need to inspect the logs of the JobManager pods and the status of the persistent volume associated with the high-availability.storage-dir.

The fact that Flink uses Kubernetes Lease objects under the hood for its HA leader election is often overlooked. While you configure high-availability: kubernetes, Flink’s HA module translates this into interactions with the Kubernetes API, specifically creating and managing Lease objects. These leases act as distributed locks, allowing Flink to manage leader election and ensure only one JobManager is active at a time. This abstraction means you don’t directly interact with Lease objects, but understanding their role helps debug leader election issues.

Want structured learning?

Take the full Flink course →