Flink’s GroupBy operation is failing because one or more keys are receiving a disproportionately large amount of data, overwhelming specific task managers.

Here’s how to diagnose and fix Flink data skew and hot key problems:

1. Identify the Skewed Key

The first step is to pinpoint which key is causing the problem. Flink’s Web UI is your best friend here.

  • Diagnosis: Navigate to the Flink Web UI. Go to the "Job Graph" view for your running job. Look for operators that show a significant imbalance in input/output records or processing time across different parallel instances. Often, a GroupBy or KeyBy operator will have one or a few subtasks running much slower or processing far more data than others. Click on the problematic operator and then on the "Task Managers" tab. You’ll see a breakdown of data processed per task. If one task manager is consistently handling a much higher volume, that’s your indicator.
  • Fix: There isn’t a direct "fix" command for identifying the key from the UI itself. You’ll need to instrument your application or use Flink’s metrics. For example, you can add a RichMapFunction before your KeyBy and emit a custom metric for the count of records per key.
    dataStream
        .map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
            private transient Counter keyCounter;
    
            @Override
            public void open(Configuration parameters) {
                MetricGroup metrics = getRuntimeContext().getMetricGroup();
                keyCounter = metrics.addGroup("my_metrics").counter("key_counts");
            }
    
            @Override
            public Tuple2<String, Long> map(Tuple2<String, Long> value) {
                // Increment the counter for this specific key
                keyCounter.inc(1); // This is a simplified example; a real implementation
                                   // would need to track counts per key, e.g., using a Map
                                   // and reporting aggregated metrics.
                return value;
            }
        })
        .keyBy(0)
        .window(...)
        .sum(1);
    
    Then, query these custom metrics.

2. Salting (Key Prefixing)

The most common and effective solution for skew is to add a random prefix to the skewed key, distributing the data across more task managers.

  • Diagnosis: After identifying the skewed key (e.g., "user_123"), you’ll see that all data for "user_123" lands on the same KeyBy partition.
  • Fix: Modify your data stream to append a random integer (0-N, where N is your parallelism) to the key.
    // Assume parallelism is 16
    dataStream
        .map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
            private int taskIndex;
    
            @Override
            public void open(Configuration parameters) {
                // Get the index of the current parallel instance
                taskIndex = getRuntimeContext().getIndexOfThisSubtask();
            }
    
            @Override
            public Tuple2<String, Long> map(Tuple2<String, Long> value) {
                String originalKey = value.f0;
                // Append a random salt based on the task index
                // For example, if taskIndex is 5, the new key becomes "user_123#5"
                value.f0 = originalKey + "#" + taskIndex;
                return value;
            }
        })
        .keyBy(0) // Key by the salted key
        .window(...)
        .sum(1);
    
    You’ll need a second stage to "unsalt" the keys before the final aggregation if the original key is required.
    .map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> map(Tuple2<String, Long> value) {
            String saltedKey = value.f0;
            String originalKey = saltedKey.split("#")[0]; // Remove the salt
            value.f0 = originalKey;
            return value;
        }
    })
    
  • Why it works: By adding a random prefix, you’re essentially creating new, distinct keys (e.g., "user_123#0", "user_123#1", …, "user_123#15"). Flink will now distribute these new keys across its available parallel tasks, breaking up the hot spot. The second map operation removes the salt before any final aggregations that require the original key.

3. Increase Parallelism

Sometimes, the data volume is simply too high for the current parallelism.

  • Diagnosis: If you’ve identified skew and salting doesn’t fully resolve it, or if all task managers are overloaded (not just one or two), your parallelism might be too low.
  • Fix: Increase the parallelism setting for your job or specific operators.
    // Setting parallelism for the entire job
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(32); // Increase from, say, 16
    
    // Setting parallelism for a specific operator
    dataStream
        .keyBy(0)
        .window(...)
        .sum(1)
        .setParallelism(32);
    
  • Why it works: More parallel instances mean more task managers are available to process the data, distributing the load more evenly.

4. Data Filtering / Pre-aggregation

If a small subset of data is inherently much larger or more frequent, consider filtering it out early or aggregating it separately.

  • Diagnosis: You observe that a specific type of event or a particular user’s activity dominates the data stream.
  • Fix: Implement logic to identify and potentially filter or aggregate "hot" data points upstream.
    // Example: Aggregate high-frequency events before the main KeyBy
    DataStream<Event> hotEvents = inputStream
        .filter(event -> isHotEvent(event)) // Identify hot events
        .keyBy(Event::getUserId)
        .window(TumblingEventTimeWindows.of(Time.minutes(1)))
        .aggregate(new HotEventAggregator());
    
    DataStream<Event> regularEvents = inputStream
        .filter(event -> !isHotEvent(event)); // Filter out hot events
    
    // Union and then KeyBy for regular events
    regularEvents
        .union(hotEvents.map(aggregatedHot -> convertAggregatedToEvent(aggregatedHot))) // Convert aggregated hot events back if needed
        .keyBy(Event::getUserId)
        .window(...)
        .sum(...);
    
  • Why it works: By processing the high-volume data separately or reducing its volume through aggregation early on, you prevent it from overwhelming the main processing pipeline.

5. Flink’s Adaptive Scheduler (Experimental)

For Flink versions 1.15+, Flink has an experimental adaptive scheduler that can dynamically adjust parallelism.

  • Diagnosis: You observe consistent skew that’s hard to predict or manage manually.
  • Fix: Enable the adaptive scheduler. This is a configuration change, not a code change.
    # flink-conf.yaml
    execution.scheduler: adaptive
    
    You might also need to configure adaptive-batch-scheduler.max-parallelism and adaptive-batch-scheduler.min-parallelism.
  • Why it works: The adaptive scheduler monitors task execution and can dynamically adjust parallelism for operators during runtime, attempting to mitigate skew automatically. Note this is still experimental and might not be suitable for all production workloads.

6. Custom Partitioner (Advanced)

For very specific, known skew patterns, you can write a custom Partitioner.

  • Diagnosis: You have a deep understanding of your data distribution and a specific, recurring skew pattern that salting doesn’t perfectly address.
  • Fix: Implement a custom Partitioner.
    public class MyCustomPartitioner implements Partitioner<String> {
        @Override
        public int partition(String key, int numParallelInstances) {
            // Custom logic to map keys to partitions
            if (key.startsWith("highly_skewed_prefix")) {
                return Math.abs(key.hashCode()) % (numParallelInstances / 2); // Send to first half of tasks
            } else {
                return Math.abs(key.hashCode()) % numParallelInstances; // Default distribution
            }
        }
    }
    
    dataStream
        .keyBy(0)
        .partitionCustom(new MyCustomPartitioner(), 0) // Apply custom partitioner
        .window(...)
        .sum(1);
    
  • Why it works: This gives you fine-grained control over how keys are mapped to task managers, allowing you to manually distribute known hot keys to less loaded partitions or spread them across more partitions than the default hash partitioning would.

The next error you’ll likely encounter after fixing data skew is related to state size management or backpressure on downstream operators if the processing bottleneck was shifted rather than eliminated.

Want structured learning?

Take the full Flink course →