Flink’s watermark mechanism is designed to handle out-of-order events, but its true power lies in how it interacts with allowed lateness to manage late-arriving data without sacrificing correctness or significantly impacting latency.
Let’s see this in action. Imagine a simple Flink job that counts events per minute. Without watermarks, if an event for minute 5 arrives after we’ve already processed minute 6, our count for minute 5 will be forever incomplete.
Here’s a snippet of how you’d set this up in Flink using Java:
DataStream<MyEvent> events = ... // Your stream of events, each with a timestamp
// Assign timestamps and create watermarks.
// This assumes your events have a getTimestamp() method returning a long (milliseconds).
DataStream<MyEvent> timestamps = events
.assignTimestampsAndWatermarks(
new WatermarkStrategy<MyEvent>() {
@Override
public TimestampAssigner<MyEvent> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, recordTimestamp) -> event.getTimestamp(); // Use event's own timestamp
}
@Override
public WatermarkGenerator<MyEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
// Max out-of-orderness is 5 seconds (5000 milliseconds)
return new BoundedOutOfOrdernessWatermarkGenerator<>(Duration.ofMillis(5000));
}
}
);
// Key by a field (e.g., a user ID or a category) and window by time.
// Here, we're using a 1-minute tumbling window.
DataStream<Tuple2<String, Long>> counts = timestamps
.keyBy(MyEvent::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// THIS IS WHERE ALLOWED LATENESS COMES IN
.allowedLateness(Time.minutes(1)) // Allow data up to 1 minute late
.aggregate(new CountAggregator());
counts.print();
In this example, BoundedOutOfOrdernessWatermarkGenerator is crucial. It tells Flink that events might be up to 5 seconds out of order. Flink advances its internal watermark based on the maximum observed timestamp minus this allowed lateness. This watermark signals the progress of event time. When the watermark for a given window passes the end of that window, Flink would normally close the window and emit results.
However, the allowedLateness(Time.minutes(1)) clause changes this behavior. Even after the watermark for a window passes its end time, Flink will keep the window open for an additional minute. This means any events that arrive within that "allowed lateness" period will still be processed by the window’s aggregate function. For our 1-minute tumbling window, this means events that are up to 1 minute late (beyond the window’s natural end) can still be incorporated into the correct window’s results.
The CountAggregator would look something like this:
public static class CountAggregator implements AggregateFunction<MyEvent, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MyEvent value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
The mental model here is that Flink maintains a notion of "event time progress" via watermarks. When a watermark arrives that is greater than or equal to the end time of a window plus its allowed lateness, Flink will trigger a late emission for that window. This late emission includes any results from data that arrived within the allowed lateness period. Crucially, Flink doesn’t automatically shut down windows just because their watermark has passed. It keeps them alive for the duration specified by allowedLateness.
The surprising part is that even after a window is "closed" by the watermark, Flink can still emit updated results for that window if late data arrives and allowedLateness is configured. This is because Flink doesn’t just discard window state when the watermark passes; it waits for the allowedLateness duration to expire. If late data arrives before this expiration, it’s processed, and a new, updated result is emitted for that same window. If you need to ensure you capture all data, even if it’s extremely late, you might eventually look into Flink’s side outputs for handling truly exceptional lateness.
The next concept you’ll likely grapple with is how to handle windows that have already been fully emitted (i.e., their end time has passed by more than the allowed lateness) and still receive late data.