The most surprising thing about achieving exactly-once semantics from Flink to Kafka is that it doesn’t involve any special Kafka producer configuration at all; all the magic happens within Flink’s checkpointing and transactional capabilities.

Let’s see this in action. Imagine a Flink job reading from a Kafka topic (source-topic), doing some simple transformation (like converting to uppercase), and writing to another Kafka topic (sink-topic).

public class FlinkKafkaExactlyOnce {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Enable checkpointing and set the interval
        env.enableCheckpointing(5000); // Checkpoint every 5 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // Minimum 2 seconds pause
        env.getCheckpointConfig().setCheckpointTimeout(60000); // 60 seconds timeout
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Only one checkpoint at a time

        // Configure Kafka Source
        Properties sourceProps = new Properties();
        sourceProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092,kafka-broker2:9092");
        sourceProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-source-group");
        sourceProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
                "source-topic",
                new SimpleStringSchema(),
                sourceProps
        );

        // Configure Kafka Sink
        Properties sinkProps = new Properties();
        sinkProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092,kafka-broker2:9092");
        sinkProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100"); // Linger for 100ms
        sinkProps.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas to acknowledge

        // Use the FlinkKafkaProducer with transactional capabilities
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
                "sink-topic",
                new SimpleStringSchema(),
                sinkProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE // This is the key!
        );

        // Build the dataflow
        env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
           .map(String::toUpperCase)
           .sinkTo(kafkaSink.withTransactionalSupport()); // Enable transactional support in the sink

        env.execute("Flink Kafka Exactly-Once Example");
    }
}

The problem Flink’s exactly-once semantics solves for Kafka is preventing data loss or duplication when the Flink job fails and restarts. Without it, if Flink processes a message, writes it to Kafka, but then crashes before committing its read offset, it might re-read and re-process that same message upon restart, leading to duplicates. Or, if it crashes after committing its offset but before its write to Kafka is fully acknowledged by Kafka, that write might be lost, leading to data loss.

Here’s how Flink achieves this:

  1. Transactional Kafka Producer: The FlinkKafkaProducer with FlinkKafkaProducer.Semantic.EXACTLY_ONCE acts as a Kafka transactional producer. This means it can group multiple writes into a single atomic transaction.

  2. Flink Checkpointing: Flink’s distributed snapshots (checkpoints) are the backbone. When a checkpoint is triggered (every 5 seconds in our example), Flink:

    • Pauses data ingestion: It stops reading new data from the source.
    • Finalizes pending Kafka writes: It instructs the transactional Kafka producer to begin a transaction. All records emitted since the last successful checkpoint are added to this transaction.
    • Takes a snapshot: It captures the current state of the Flink job, including the offsets of the Kafka source topic that have been read.
    • Commits the transaction: Once the snapshot is successfully stored, Flink tells the transactional Kafka producer to commit its transaction. This makes the data written within that transaction visible in Kafka.
    • Commits source offsets: Crucially, Flink then commits the source offsets that correspond to the data included in the just-committed transaction.
  3. Failure and Recovery: If Flink fails:

    • It restarts from the latest successful checkpoint.
    • It restores its state from the checkpoint.
    • The transactional Kafka producer, upon restart, will see any transactions that were initiated but not committed before the failure. It aborts these incomplete transactions.
    • Flink then resumes reading from the Kafka source at the committed offsets captured in the last successful checkpoint.
    • Since the data written in the aborted transaction is not visible in Kafka, and Flink resumes reading after the data that was part of the transaction, no duplicates are produced, and no data is lost.

The withTransactionalSupport() method on the FlinkKafkaProducer is essential. It internally configures the producer with enable.idempotence=true, transactional.id.prefix, and max.block.ms to ensure it can participate in Flink’s checkpointing mechanism as a transactional producer. Flink manages the transaction IDs and the commit/abort logic based on its checkpointing coordination.

The ProducerConfig.ACKS_CONFIG set to "all" and ProducerConfig.LINGER_MS_CONFIG are important for performance and durability within a transaction, ensuring that when a transaction commits, the data is highly likely to be durable. However, the exactly-once guarantee comes from Flink’s coordination of the producer’s transactions with its own checkpointing and source offset commits.

One aspect often overlooked is how Flink handles the transactional.id for the Kafka producer. It uses a combination of the Flink job’s parallelism and the operator’s subtask index. If you change the parallelism of your Flink job, Flink will generate new transaction IDs for the Kafka producer. This means that any pending transactions from the previous parallelism will be aborted. If a checkpoint was in progress when the parallelism changed, and the producer had already started a transaction, that transaction would be aborted, and Flink would reprocess data from the previous checkpoint’s offsets. So, changing parallelism mid-flight while expecting exactly-once can be tricky and might lead to reprocessing if not carefully managed.

The next concept you’ll likely grapple with is handling stateful operations within Flink and ensuring their consistency with Kafka exactly-once writes, especially when dealing with event time and watermarks.

Want structured learning?

Take the full Flink course →