Flink’s parallelism isn’t just a knob you turn to make things go faster; it’s a fundamental aspect of how Flink distributes work across your cluster, and getting it wrong means you’re leaving performance on the table or, worse, starving your jobs of resources.
Let’s see Flink’s parallelism in action. Imagine you have a simple streaming job that reads from Kafka, maps each record, and writes to another Kafka topic.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism for the whole job
env.setParallelism(4);
DataStream<String> source = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setTopics("input-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build(),
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
DataStream<String> mapped = source.map(value -> value.toUpperCase());
mapped.sinkTo(
KafkaSink.<String>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setRecordSerializer(KafkaRecordSerializer.valueSerialization(new SimpleStringSchema()))
.build()
);
env.execute("Simple Kafka to Kafka Job");
In this example, env.setParallelism(4) tells Flink that by default, each operator in this job (the Kafka source, the map function, and the Kafka sink) should be executed by 4 parallel instances. Flink will then try to assign these 4 instances of each operator to available task slots on your TaskManagers.
The problem Flink’s parallelism solves is distributing computation. Without it, you’d have one massive process doing everything, which is a bottleneck for both throughput and fault tolerance. With parallelism, Flink breaks down each operator into multiple subtasks. These subtasks run concurrently on different TaskManagers (or even different TaskSlots on the same TaskManager), allowing Flink to process more data in parallel.
The exact levers you control are:
- Job-level parallelism: Set using
StreamExecutionEnvironment.setParallelism(int parallelism). This is the default for all operators in the job. - Operator-level parallelism: Set for specific operators using
operator.setParallelism(int parallelism). This overrides the job-level parallelism for that particular operator. - Cluster configuration (Task Slots): The number of available task slots on each TaskManager. This is configured in
flink-conf.yamlwithtaskmanager.numberOfTaskSlots. A task slot is a fixed unit of resource on a TaskManager where a subtask can run. If you have 3 TaskManagers, each with 4 task slots, your cluster has a total of 12 task slots available.
When you set env.setParallelism(4), Flink creates 4 instances of the Kafka source, 4 instances of the map operator, and 4 instances of the Kafka sink. Flink’s JobManager then schedules these 12 subtasks (4+4+4) onto the available task slots in your cluster. If you have fewer than 12 task slots in total, Flink will still try to schedule them, but some TaskManagers might run multiple subtasks from different operators, or even multiple subtasks of the same operator if they fit. If you have more than 12 task slots, some will remain idle for this job.
The most common mistake people make is conflating Flink parallelism with the number of Kafka partitions. If your Kafka topic has 8 partitions, and you set Flink parallelism to 2, you’ll only ever have 2 parallel source instances reading from Kafka, effectively creating a bottleneck at the source. You should generally set your Flink parallelism at least equal to, or a multiple of, your Kafka partition count for efficient data ingestion.
The next concept you’ll likely grapple with is how Flink handles backpressure when an operator cannot keep up with the data it’s receiving, and how task slot allocation impacts this.