Apache Flink jobs can process data at incredible speeds, but achieving peak throughput often requires a deep dive into its configuration and execution.

Here’s a Flink job running a simple map operation on a PCollection of integers, doubling each one:

import org.apache.flink.streaming.api.scala._

object ThroughputExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Configure parallelism for the entire job
    env.setParallelism(4)

    val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val doubled = data.map(_ * 2)

    // You would typically have a sink here, e.g.,
    // doubled.print()
    // For this example, we'll just let it run.

    env.execute("Throughput Optimization Example")
  }
}

This StreamExecutionEnvironment is configured with a parallelism of 4. This means Flink will attempt to run each operator (source, map, sink) with 4 parallel instances. The data source env.fromElements will be partitioned, and the map operator will process its input partitions in parallel.

The core problem Flink solves is stateful stream processing with low latency and high throughput. Unlike batch systems that process data in discrete chunks, Flink processes unbounded streams of data continuously. It achieves this by maintaining operator state (like aggregations or window counts) and processing events one by one or in micro-batches, ensuring consistency even in the face of failures. The levers you control are primarily parallelism, memory management, network buffers, and serialization.

One of the most impactful optimizations involves understanding Flink’s checkpointing mechanism. Checkpointing is Flink’s fault-tolerance tool, periodically saving the state of your application. While essential for recovery, frequent or large checkpoints can become a bottleneck. By default, Flink uses asynchronous snapshots. However, if your checkpoint interval is too short (e.g., 1000ms) and your state is large, the snapshotting process might not complete before the next one is triggered, leading to overlapping checkpoints and increased latency. Adjusting the execution.checkpointing.interval to a slightly larger value, like 5000ms, and ensuring execution.checkpointing.tolerable-failed-checkpoints is set appropriately (e.g., 1) can prevent this contention.

What actually broke: Your Flink job is not processing events as fast as expected, leading to increasing backpressure and potentially late events or job failures. This is typically indicated by metrics like "busy time" on TaskManagers being low, "idle time" being high, or significant "backpressure" signals in the Flink UI.

Common Causes and Fixes:

  1. Insufficient Parallelism:

    • Diagnosis: Check Flink UI for operators with high input/output buffer usage or significant backpressure. Look at TaskManager CPU utilization – if it’s consistently low across all TaskManagers, you might not be utilizing available resources.
    • Fix: Increase the parallelism of the job or specific operators. For the entire job, use env.setParallelism(N) where N is a multiple of your TaskManager slots. For specific operators, use .setParallelism(N) on the DataStream API transformation. For example, if you have 8 TaskManager slots and your job is bottlenecked on a flatMap operation, setting myFlatMapOperator.setParallelism(16) might be beneficial.
    • Why it works: More parallel instances of an operator can process more input partitions concurrently, distributing the workload and increasing overall processing capacity.
  2. Network Buffer Saturation:

    • Diagnosis: In the Flink UI, observe "Network" metrics for "inPoolUsage" and "outPoolUsage". If these are consistently high (e.g., > 90%), the network buffers are full, preventing data from being sent or received. Also, check network.out.backpressured.time and network.in.backpressured.time metrics.
    • Fix: Increase the number of network buffers and/or their size. Set taskmanager.network.numberOfBuffers: 16384 and taskmanager.network.bufferSize: 128kb (or larger, e.g., 256kb or 512kb) in your flink-conf.yaml.
    • Why it works: Larger or more numerous buffers allow for more data to be held temporarily during transit between operators, reducing the likelihood of upstream operators being blocked by downstream consumers.
  3. Inefficient Serialization:

    • Diagnosis: If your data objects are large or complex, serialization/deserialization can become a significant CPU overhead. Monitor CPU usage on TaskManagers; if it’s high but not directly attributable to specific operators, serialization might be the culprit.
    • Fix: Use Flink’s efficient serializers. Ensure your POJOs are registered with Flink (using @TypeInfo annotations or by configuring flink-conf.yaml’s pipeline.class-serialization to true). For complex types, consider using Kryo or Avro, and tune Kryo registration if needed. For primitive types and common Java types, Flink’s internal serializers are usually very fast.
    • Why it works: Efficient serializers minimize the CPU time and network bandwidth required to convert data objects into byte streams for network transfer and state storage.
  4. Checkpointing Latency/Frequency:

    • Diagnosis: Monitor the "checkpoint duration" and "checkpoint alignment time" metrics in the Flink UI. If these are consistently high or increasing, or if you see "Checkpoints failed" warnings, checkpointing is a bottleneck.
    • Fix: Increase the checkpoint interval (execution.checkpointing.interval: 30000 for 30 seconds). If using RocksDB state backend, consider tuning RocksDB specific settings or increasing taskmanager.memory.managed.fraction. For large state, consider enabling incremental checkpoints if your state backend supports it (e.g., RocksDB).
    • Why it works: A longer interval reduces the frequency of checkpointing overhead. Incremental checkpoints only upload changed state, reducing I/O and network traffic during snapshots.
  5. State Backend Performance:

    • Diagnosis: If you are using a state backend that relies heavily on disk I/O (like the FileSystem state backend for large state) or is slow to access (e.g., remote storage without proper caching), it can limit throughput. Monitor disk I/O and latency for your state backend.
    • Fix: For high throughput and large state, the RocksDBStateBackend is generally recommended. Ensure it’s configured with appropriate native memory settings and potentially tuned RocksDB options. For very high-throughput, low-latency, but smaller state, FsStateBackend (with local SSDs) or MemoryStateBackend (for small state, not recommended for production due to memory limits) might be considered, but RocksDB offers the best balance for most scenarios.
    • Why it works: RocksDB is optimized for high-performance local storage of large states, offering efficient read/write operations and good compression.
  6. Garbage Collection (GC) Pauses:

    • Diagnosis: Monitor TaskManager GC times and frequencies. Long or frequent GC pauses can cause operators to stall, leading to backpressure and increased latency. Use JVM GC logging and monitoring tools.
    • Fix: Tune JVM GC settings. For example, switch to a modern GC like G1GC (-XX:+UseG1GC) and adjust heap size (-Xmx, -Xms) and potentially other G1GC specific flags. Consider using flink-conf.yaml’s env.java.opts to set these.
    • Why it works: Modern GCs are designed to minimize pause times, allowing the Flink operators to run more consistently without being interrupted for extended periods.
  7. Bottlenecked Source/Sink:

    • Diagnosis: Examine the Flink UI for backpressure originating from your source operator or terminating at your sink operator. Check the metrics of the external systems your source/sink interacts with (e.g., Kafka consumer lag, database write latency).
    • Fix: Increase the parallelism of your source and sink operators to match or exceed the parallelism of intermediate operators. For Kafka, ensure sufficient partitions and consumer group parallelism. For custom sinks, optimize their write operations or add buffering.
    • Why it works: The overall throughput of a pipeline is limited by its slowest component. Ensuring sources and sinks can keep up with the processing rate is crucial.

The next error you’ll hit after fixing throughput issues is likely related to sustained high throughput causing memory exhaustion if not properly managed, leading to OutOfMemoryError exceptions or excessive GC.

Want structured learning?

Take the full Flink course →