Flink’s High Availability (HA) mode with ZooKeeper for JobManager essentially means that if the primary JobManager instance dies, another one can seamlessly take over, allowing your Flink jobs to continue running with minimal interruption.
Let’s see it in action. Imagine you have a simple Flink job that continuously counts incoming numbers.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class HighAvailabilityDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure HA
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Simulate a data source
DataStream<Long> source = env.addSource(new SourceFunction<Long>() {
private volatile boolean running = true;
private long counter = 0;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (running) {
synchronized (ctx.getLock()) {
ctx.collect(counter++);
}
Thread.sleep(100); // Emit every 100ms
}
}
@Override
public void cancel() {
running = false;
}
});
// Simple processing: count numbers
source.countWindowAll(100).sum(0).print();
env.execute("HA JobManager Demo");
}
}
If you were to run this job normally and then kill the JobManager process, the job would stop. With HA configured, another JobManager would notice the failure, connect to ZooKeeper, acquire the leadership, and restart the job from the last successful checkpoint. You’d see a brief blip, but your counts would continue from where they left off.
The problem this solves is obvious: single points of failure. In a distributed system like Flink, having a single JobManager means that if it goes down, your entire job graph halts. HA with ZooKeeper provides a mechanism for automatic failover. ZooKeeper acts as a distributed coordination service. Flink JobManagers use it to:
- Elect a Leader: Multiple JobManager instances run, but only one is active at a time. They use ZooKeeper’s ephemeral nodes to contend for leadership. The one that successfully creates an ephemeral node for a specific path becomes the leader. If that node disappears (because the leader crashed), ZooKeeper triggers an event for other JobManagers to try and become the new leader.
- Store Metadata: ZooKeeper also stores critical metadata about the Flink cluster and running jobs, such as the current leader’s address, the status of running jobs, and potentially information needed for recovery.
- Coordinate Recovery: When a new leader is elected, it uses the metadata from ZooKeeper to identify the state of the job (e.g., the latest completed checkpoint) and restart the job graph accordingly.
To configure this, you’ll need a running ZooKeeper ensemble. Then, in your Flink configuration (flink-conf.yaml), you’ll set up the HA properties:
high-availability: zookeeper
high-availability.storage-dir: hdfs:///flink/recovery # Or file:///path/to/local/recovery if not using HDFS
zookeeper.cluster-id: my_flink_cluster # A unique identifier for your Flink cluster
zookeeper.address: zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 # Your ZooKeeper ensemble
zookeeper.path.root: /flink # The root path in ZooKeeper for Flink
zookeeper.retry-delay: 1000
zookeeper.retry-count: 5
The high-availability.storage-dir is crucial. This is where Flink will persist checkpoint and savepoint metadata before it’s fully replicated to ZooKeeper, and also where it stores some internal state for recovery. If you’re running Flink in a distributed environment, using a distributed file system like HDFS is highly recommended here. If you’re just experimenting locally, a local path like file:///tmp/flink/recovery will work, but it won’t survive a full node reboot.
The zookeeper.cluster-id is important for isolating different Flink clusters that might be sharing the same ZooKeeper ensemble. zookeeper.address is the comma-separated list of your ZooKeeper servers. zookeeper.path.root defines the base path in ZooKeeper where Flink will create its znodes.
When Flink starts in HA mode, each JobManager instance will try to register itself and contend for leadership under the path specified by zookeeper.path.root (e.g., /flink/leader). The one that succeeds becomes the active JobManager. If the active JobManager fails, its ephemeral node in ZooKeeper is deleted, triggering ZooKeeper to notify other candidates to try and become the new leader. The new leader then reads checkpoint data from the high-availability.storage-dir and the relevant metadata from ZooKeeper to resume the job.
A common misconception is that ZooKeeper itself stores all the checkpoint data. It doesn’t. ZooKeeper is for coordination and storing pointers to where the actual checkpoint data resides (which is typically in a distributed file system like HDFS, S3, or a configured high-availability.storage-dir). ZooKeeper’s role is to ensure that only one JobManager is active and that it knows how to recover.
The next thing you’ll likely encounter is configuring the HA for TaskManagers, which involves a similar ZooKeeper setup but focuses on heartbeats and task slot availability.