The most surprising thing about Flink’s broadcast state is that it’s not just for configuration; it’s a first-class citizen for distributing dynamic, real-time logic across your entire streaming application.

Imagine you have a Flink job processing millions of events per second. Suddenly, a business requirement changes: a set of "allowlisted" user IDs needs to be updated. Without broadcast state, you’d have to stop and redeploy your entire Flink job, causing downtime. With broadcast state, you can push this updated list to every parallel instance of your processing operator in near real-time, without any interruption.

Let’s see this in action. We’ll set up a simple Flink job that counts events, but with a twist: it only counts events whose source IP address is present in a dynamically updated broadcasted list of "trusted" IPs.

First, we need a way to broadcast the trusted IPs. This typically comes from an external source like a Kafka topic, a database, or even a REST API. For this example, we’ll simulate it by sending a few IP addresses through a SourceFunction.

// Broadcast side: Simulates an external source pushing trusted IPs
public static class TrustedIpBroadcaster extends RichSourceFunction<String> {
    private volatile boolean running = true;
    private transient ListState<String> trustedIpsState; // This will be our broadcast state

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // We need to define the broadcast state descriptor for the trusted IPs
        // The key is that this operator will *broadcast* this state.
        BroadcastStateDescriptor<String, String> descriptor =
            new BroadcastStateDescriptor<>("trusted-ips", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
        getRuntimeContext().getBroadcastState(descriptor); // This registers the state
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        Thread.sleep(5000); // Wait a bit before sending the first IP
        broadcastIp("192.168.1.100", ctx);
        Thread.sleep(10000);
        broadcastIp("10.0.0.5", ctx);
        Thread.sleep(15000);
        broadcastIp("172.16.0.20", ctx); // Add another trusted IP
        Thread.sleep(20000);
        // Simulate an update: remove an IP
        // To remove, you'd typically send a special "delete" message or an empty value.
        // For simplicity here, we'll just send a "null" IP to signify removal.
        // In a real-world scenario, you'd have a more robust mechanism.
        broadcastIp(null, ctx); // Remove 192.168.1.100
        Thread.sleep(Integer.MAX_VALUE); // Keep running
    }

    private void broadcastIp(String ip, SourceContext<String> ctx) throws Exception {
        synchronized (ctx.getCheckpointLock()) {
            BroadcastState<String, String> broadcastState = ctx.getBroadcastState(
                new BroadcastStateDescriptor<>("trusted-ips", StringSerializer.INSTANCE, StringSerializer.INSTANCE)
            );
            if (ip != null) {
                broadcastState.put(ip, ip); // The key is the IP, the value is also the IP for simplicity
                System.out.println("BROADCASTED: Adding IP: " + ip);
            } else {
                // In a real scenario, you'd have a way to signal removal.
                // For this example, we'll just assume a null means remove the last added.
                // A more realistic approach is to iterate and remove specific keys.
                // For demonstration, let's assume we know which one to remove and do it.
                // The actual removal mechanism depends on how you signal updates.
                // A common pattern is to send the IP again with a null value for removal.
                // Let's simulate removing '192.168.1.100' explicitly.
                broadcastState.remove("192.168.1.100");
                System.out.println("BROADCASTED: Removing IP: 192.168.1.100");
            }
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Now, our main processing job. This job will have two streams: one for the incoming events and one for the broadcasted trusted IPs. We’ll connect them using connect and process.

// Main processing job
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Ensure parallelism is at least 2 to see broadcast in action across operators
    env.setParallelism(2);

    // 1. Create a stream for the events to be processed
    DataStream<String> eventStream = env.fromElements(
        "event_from_192.168.1.100",
        "event_from_10.0.0.1",
        "event_from_10.0.0.5",
        "event_from_172.16.0.20",
        "event_from_192.168.1.100",
        "event_from_10.0.0.2"
    ).map(event -> {
        // In a real scenario, you'd parse the event to get the IP.
        // For simplicity, we'll assume the event string contains the IP.
        if (event.contains("192.168.1.100")) return "192.168.1.100";
        if (event.contains("10.0.0.1")) return "10.0.0.1";
        if (event.contains("10.0.0.5")) return "10.0.0.5";
        if (event.contains("172.16.0.20")) return "172.16.0.20";
        if (event.contains("10.0.0.2")) return "10.0.0.2";
        return "unknown"; // Should not happen with current inputs
    });

    // 2. Create a stream for the broadcasted trusted IPs
    // We need a BroadcastStateDescriptor to define the structure of our broadcast state.
    // Key: String (the IP address), Value: String (the IP address itself, or more metadata if needed)
    BroadcastStateDescriptor<String, String> trustedIpsDescriptor =
        new BroadcastStateDescriptor<>("trusted-ips", StringSerializer.INSTANCE, StringSerializer.INSTANCE);

    // 3. Connect the two streams and apply a process function
    // The 'broadcast' operation marks the trustedIpStream as a broadcast stream.
    // Flink will ensure this stream is sent to all downstream operators that require it.
    DataStream<String> processedStream = eventStream
        .connect(env.addSource(new TrustedIpBroadcaster()).broadcast(trustedIpsDescriptor))
        .process(new KeyedBroadcastProcessFunction<String, String, String, String>() {
            private transient MapState<String, Long> trustedIpCounts; // State for trusted IPs

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // Initialize the state for trusted IPs.
                // This state will hold counts *per trusted IP*.
                MapStateDescriptor<String, Long> ipCountDescriptor =
                    new MapStateDescriptor<>("trusted-ip-counts", StringSerializer.INSTANCE, LongSerializer.INSTANCE);
                this.trustedIpCounts = getRuntimeContext().getMapState(ipCountDescriptor);
            }

            // This method is called for each element from the main 'eventStream'
            @Override
            public void processElement(String ipAddress, KeyedBroadcastProcessFunction<String, String, String, String>.Context ctx, Collector<String> out) throws Exception {
                // We need to get the broadcast state. The key is the broadcast stream's name.
                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(trustedIpsDescriptor);

                if (broadcastState.contains(ipAddress)) {
                    // If the IP address is in our trusted list, increment its count.
                    long currentCount = trustedIpCounts.getOrDefault(ipAddress, 0L);
                    trustedIpCounts.put(ipAddress, currentCount + 1);
                    out.collect("Trusted IP " + ipAddress + " event processed. New count: " + (currentCount + 1));
                } else {
                    // Optionally, handle non-trusted IPs, e.g., log them or ignore.
                    out.collect("Non-trusted IP " + ipAddress + " event ignored.");
                }
            }

            // This method is called for each element from the broadcast 'trustedIpStream'
            // This is where we update our local copy of the broadcast state.
            @Override
            public void processBroadcastElement(String ipUpdate, KeyedBroadcastProcessFunction<String, String, String, String>.Context ctx, Collector<String> out) throws Exception {
                // The 'ipUpdate' is the IP address we received from the broadcaster.
                // The broadcast state is managed by Flink. We just need to ensure our
                // local map state reflects the broadcast state.
                // The TrustedIpBroadcaster sends 'null' to signal removal.
                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(trustedIpsDescriptor);

                // In a real scenario, you'd get the *entire* state or a diff.
                // Here, we receive individual updates.
                // The TrustedIpBroadcaster puts the IP as key and value.
                // If ipUpdate is null, it signifies a removal, which we've handled in the broadcaster.
                // For simplicity, we rely on the broadcaster to manage adds/removals in its state,
                // and Flink propagates these changes. Our 'broadcastState' here *is* the current trusted set.

                // We can iterate through the broadcast state to see the current trusted IPs.
                // This is useful for debugging or if we need to rebuild our local state.
                System.out.println("BROADCAST UPDATE RECEIVED: Current trusted IPs in broadcast state:");
                for (Map.Entry<String, String> entry : broadcastState.immutableEntries()) {
                    System.out.println("  - " + entry.getKey());
                    // If the IP is no longer in the broadcast state, we might want to clear its count.
                    // However, our processElement logic already checks against the broadcastState.
                    // If an IP is removed from broadcast state, `broadcastState.contains(ipAddress)` will become false.
                }
            }
        });

    processedStream.print();
    env.execute("Flink Broadcast State Example");
}

When you run this, you’ll see events being processed, and the counts for trusted IPs will update dynamically. If you were to modify the TrustedIpBroadcaster to send a new IP address after the job has started, the processElement function in your main job would immediately start recognizing and counting events from that new IP, without any redeployment.

The core idea is that Flink takes care of distributing the trusted-ips broadcast state to all parallel instances of the KeyedBroadcastProcessFunction. Each instance gets an identical, up-to-date copy. When the TrustedIpBroadcaster adds or removes an IP from its broadcast state, Flink propagates this change efficiently.

The KeyedBroadcastProcessFunction is crucial here. It has two processing methods: processElement for regular data streams and processBroadcastElement for handling updates from the broadcast stream. This separation allows you to react to incoming data and external configuration changes independently.

You might be tempted to think of broadcast state as just a shared configuration. But it’s more powerful: it’s a mechanism for distributing stateful logic. The trustedIpCounts MapState is local to each operator instance, but it’s populated and validated against the globally consistent broadcast state. This allows each operator to maintain its own performance-critical state (the counts) while relying on a dynamically updated, globally consistent source of truth (the trusted IPs).

One subtlety often missed is how state updates are handled during Flink’s checkpointing. When a broadcast state update occurs, Flink ensures that this update is part of the next consistent checkpoint. If a failure happens, and the job restarts, all operator instances will recover their local state and the broadcast state to the same consistent point, guaranteeing that your dynamic rules are applied correctly even after recovery.

The next step to explore is how to handle more complex broadcast state structures, like mapping IP ranges to specific processing rules, and how to integrate with external systems like Kafka for robust, fault-tolerant broadcast updates.

Want structured learning?

Take the full Flink course →