A long-running Flink job can silently consume infinite memory, causing OOMs and crashes, by continuously accumulating state for keys that are no longer relevant.

Let’s watch a simple Flink job process events and see how its state grows. Imagine we have a MapState to count events per user ID.

// Java example
public class UserCounter implements RichFlatMapFunction<UserEvent, Tuple2<String, Long>> {

    private MapState<String, Long> userCounts;

    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Long> descriptor =
            new MapStateDescriptor<>("userCounts", String.class, Long.class);
        this.userCounts = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void flatMap(UserEvent event, Collector<Tuple2<String, Long>> out) throws Exception {
        long currentCount = userCounts.getOrDefault(event.getUserId(), 0L);
        userCounts.put(event.getUserId(), currentCount + 1);
        out.collect(Tuple2.of(event.getUserId(), currentCount + 1));
    }
}

If we feed this job UserEvents like {"userId": "alice", "timestamp": 1678886400} and then stop sending events for "alice," her entry in userCounts will persist forever. If new users keep appearing, and old ones eventually disappear from the event stream, the userCounts state will balloon with entries for users who are no longer active. This is the unbounded state growth problem.

The core issue is that Flink’s state backends (like RocksDB) are designed for durability and performance, not automatic garbage collection of state entries based on application logic. When you put something into MapState, ListState, ValueState, etc., it’s there until explicitly removed or the job is reset.

The Problem: State Grows Indefinitely

Consider a job processing user activity. User IDs might appear, generate activity, and then become inactive. If the job doesn’t explicitly clean up state for inactive users, the state backend will keep storing these entries. Eventually, this leads to:

  1. Memory Exhaustion: If using a memory-based state backend or if the RocksDB write buffer grows too large.
  2. Disk Space Exhaustion: For RocksDB, the state files can grow to terabytes.
  3. Performance Degradation: Larger state means slower lookups, writes, and checkpoints.
  4. Job Crashes: OutOfMemoryError (OOM) or disk full errors will bring the job down.

Solutions for Unbounded State

The fundamental solution is to implement state TTL (Time-To-Live). Flink provides mechanisms to automatically expire state entries after a configurable period.

1. Using Flink’s State TTL Configuration:

This is the most common and recommended approach. You configure TTL directly on the state descriptor.

  • Diagnosis: Check your Flink job’s state size. You can do this via the Flink Web UI (Metrics tab) for the lastCheckpointSize or by inspecting the state backend directory (e.g., RocksDB’s state folder). Look for a massive and ever-increasing state size.

  • Fix: Modify your state descriptor to include TTL configuration.

    // Java example
    MapStateDescriptor<String, Long> descriptor =
        new MapStateDescriptor<>("userCounts", String.class, Long.class);
    
    // Configure TTL for state entries
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7)) // State expires after 7 days
        .cleanupInRocksdb() // For RocksDB, trigger cleanup during writes/reads
        .build();
    descriptor.enableTimeToLive(ttlConfig);
    
    // Then use this descriptor in your RichFlatMapFunction as before
    

    In Scala:

    // Scala example
    val descriptor = new MapStateDescriptor[String, Long]("userCounts", Types.STRING, Types.LONG)
    
    // Configure TTL for state entries
    val ttlConfig = StateTtlConfig.newBuilder(Time.days(7)) // State expires after 7 days
      .cleanupInRocksdb() // For RocksDB, trigger cleanup during writes/reads
      .build()
    descriptor.enableTimeToLive(ttlConfig)
    
  • Why it works: When cleanupInRocksdb() is used with RocksDB, Flink associates TTL information with each state entry. During write or read operations to RocksDB, Flink checks if an entry’s TTL has expired and lazily removes it. This prevents state from growing indefinitely. cleanupFullSnapshot() is another option that cleans up during checkpoints but is less efficient for unbounded growth.

2. Implementing Manual State Cleanup (Less Ideal):

If you can’t use TTL (e.g., very old Flink versions or specific complex logic), you might need to manually remove state. This is brittle and error-prone.

  • Diagnosis: Same as above – check state growth.

  • Fix: You’d need to maintain a separate ValueState or MapState that tracks the "last seen" timestamp for each key. Then, periodically iterate through your state, check timestamps, and remove expired keys. This is complex.

    // Conceptual Java example (not recommended for production)
    ValueState<Long> lastAccessTime;
    MapState<String, Long> userCounts;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // ... setup userCounts ...
        ValueStateDescriptor<Long> timeDescriptor =
            new ValueStateDescriptor<>("lastAccessTime", Long.class);
        this.lastAccessTime = getRuntimeContext().getState(timeDescriptor);
    }
    
    @Override
    public void flatMap(UserEvent event, Collector<Tuple2<String, Long>> out) throws Exception {
        long currentTime = System.currentTimeMillis();
        lastAccessTime.update(currentTime); // Update last access for this key
    
        long currentCount = userCounts.getOrDefault(event.getUserId(), 0L);
        userCounts.put(event.getUserId(), currentCount + 1);
    
        // Periodically, you'd need a separate mechanism or a side output
        // to iterate through keys, check lastAccessTime, and remove from userCounts
        // if (currentTime - lastAccessTime.value() > TimeUnit.DAYS.toMillis(7)) {
        //     userCounts.remove(event.getUserId()); // This is too simplistic and needs proper iteration
        // }
    }
    
  • Why it works (conceptually): By tracking access times and actively removing stale entries, you keep the active state set small. However, iterating and removing state efficiently within Flink’s event-driven model is hard without TTL.

3. Using a Keyed State in a Window (Implicit TTL):

If your aggregation naturally fits into windows, Flink’s windowing mechanism provides implicit TTL.

  • Diagnosis: Same as above.

  • Fix: Re-architect your logic to use windows if possible. For example, instead of a MapState for all-time counts, use a MapState within a tumbling or sliding window to count events per user per window.

    // Example using session windows and MapState within the window
    dataStream
        .keyBy(UserEvent::getUserId)
        .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // User is considered active for 30 mins
        .process(new ProcessWindowFunction<UserEvent, Tuple2<String, Long>, String, TimeWindow>() {
            private MapState<String, Long> userCountsInWindow;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, Long> descriptor =
                    new MapStateDescriptor<>("userCountsInWindow", String.class, Long.class);
                this.userCountsInWindow = getRuntimeContext().getMapState(descriptor);
            }
    
            @Override
            public void process(String key, Context context, Iterable<UserEvent> elements, Collector<Tuple2<String, Long>> out) throws Exception {
                long count = 0;
                for (UserEvent event : elements) {
                    count++;
                    // You might update state here if needed per event within window
                    // userCountsInWindow.put(event.getUserId(), userCountsInWindow.getOrDefault(event.getUserId(), 0L) + 1);
                }
                // Typically you'd emit a result at the end of the window
                // out.collect(Tuple2.of(key, count));
            }
    
            @Override
            public void clear(Context context) throws Exception {
                // Window state is automatically cleaned up by Flink when the window closes
                userCountsInWindow.clear();
            }
        });
    
  • Why it works: When a window closes, Flink automatically cleans up the state associated with that window. If your logic can be expressed this way, the state is managed for you.

4. Checkpointing Configuration (Less Direct):

While not a direct fix for unbounded growth, ensuring your checkpoints are configured correctly can mitigate the impact of large state.

  • Diagnosis: Frequent checkpoint failures, long checkpoint times.

  • Fix:

    • Increase state.backend.rocksdb.checkpoint.transfer.thread.num: If using RocksDB, this can speed up checkpoint uploads.
    • Use incremental checkpoints: Ensure state.backend.incremental-checkpointing is true (default for RocksDB). This only uploads changed files.
    • Increase execution.checkpointing.timeout: Give checkpoints more time.
    • Adjust execution.checkpointing.tolerable-failed-checkpoints: Allow more failures before the job stops.
  • Why it works: These settings don’t prevent state growth but make the checkpointing process more robust and less likely to fail due to transient network issues or brief state backend slowdowns, giving your TTL mechanisms more time to work.

The next problem you’ll likely encounter is dealing with how Flink handles late events after you’ve implemented state TTL, as expired state won’t be available for late arrivals.

Want structured learning?

Take the full Flink course →