Slot sharing groups are Flink’s way of letting tasks share the same task manager slots, which can boost resource utilization and reduce startup times.
Let’s see this in action. Imagine a simple Flink job with two tasks: a Mapper and a Reducer.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("a", "b", "c", "a", "b", "a")
.keyBy(s -> s)
.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// Simulate some processing
Thread.sleep(100);
out.collect(new Tuple2<>(value, 1));
}
});
stream.keyBy(t -> t.f0)
.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
// Simulate some processing
Thread.sleep(100);
value1.f1 += value2.f1;
return value1;
}
})
.print();
env.execute("Slot Sharing Example");
By default, Flink might schedule the flatMap and reduce tasks on different TaskManagers if resources are available. This means they don’t share the same JVM, and there’s network overhead for data transfer even if they are on the same machine.
Now, let’s introduce slot sharing groups. We can assign these tasks to the same group.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define a slot sharing group
String mySharingGroup = "my-shared-slots";
DataStream<String> stream = env.fromElements("a", "b", "c", "a", "b", "a")
.keyBy(s -> s)
.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// Simulate some processing
Thread.sleep(100);
out.collect(new Tuple2<>(value, 1));
}
})
.slotSharingGroup(mySharingGroup); // Assign to the sharing group
stream.keyBy(t -> t.f0)
.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
// Simulate some processing
Thread.sleep(100);
value1.f1 += value2.f1;
return value1;
}
})
.slotSharingGroup(mySharingGroup); // Assign to the same sharing group
env.execute("Slot Sharing Example");
When you run this with multiple TaskManagers, Flink’s scheduler will try to place all operators belonging to my-shared-slots into the same TaskManager slot. If a slot has enough parallelism and resources, multiple operators within the same group will execute within that single slot. This means they share the same JVM, and data can be transferred between them directly in memory without serialization/deserialization and network stack involvement.
The primary problem slot sharing groups solve is inefficient resource utilization and underutilization of TaskManager slots, especially for jobs with many small operators or fine-grained parallelism. Without it, each operator might request its own dedicated slot, even if it only needs a fraction of a slot’s resources. This leads to many partially filled slots. By grouping operators, Flink can pack more work into fewer slots, improving throughput and reducing memory overhead. It also helps with latency-sensitive applications because inter-operator communication within the same slot is much faster.
Internally, Flink’s scheduler maintains a mapping of slot sharing groups to available slots. When an operator is submitted with a specific slot sharing group, the scheduler looks for an available slot that has already been assigned to that group, or it assigns a new slot to the group if none exists and resources permit. The slotSharingGroup method on the DataStream API is the direct lever you control. You can also configure default slot sharing groups and set parallelism for operators.
The surprising part is how Flink handles task parallelism within a shared slot. If you have a flatMap operator with parallelism 4 and a reduce operator with parallelism 2, and they are in the same slot sharing group, Flink will attempt to allocate a slot that can accommodate a total parallelism of 6. However, within that shared slot, Flink ensures that the instances of the flatMap and reduce operators are distinct, allowing them to run concurrently. The key is that they share the same JVM and task manager resources, not that they are a single logical execution unit.
By default, all operators belong to a default slot sharing group. If you don’t explicitly assign operators to a custom group, they will all try to share slots with other operators in the default group. This can lead to unexpected sharing if you intended for certain operators to be isolated.
The next concept you’ll likely encounter is how to influence the order of task placement within a shared slot, especially when dealing with complex job graphs and resource constraints.