Flink’s event time processing is actually a lot like a detective reconstructing a crime scene, not a clock ticking in real-time.
Let’s see this in action. Imagine we have a stream of sensor readings, each with a timestamp indicating when the event actually happened at the sensor.
// Sample Flink DataStream API code snippet
DataStream<SensorReading> readings = env.fromCollection(Arrays.asList(
new SensorReading("sensor1", 1678886400000L, 25.0), // Event time: March 15, 2023 12:00:00 PM UTC
new SensorReading("sensor2", 1678886460000L, 22.5), // Event time: March 15, 2023 12:01:00 PM UTC
new SensorReading("sensor1", 1678886410000L, 25.5), // Event time: March 15, 2023 12:00:10 PM UTC (out of order)
new SensorReading("sensor3", 1678886520000L, 30.0) // Event time: March 15, 2023 12:02:00 PM UTC
));
// Configure Flink to use Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Assign timestamps and watermarks
DataStream<SensorReading> readingsWithTimestamps = readings
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(5)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp();
}
});
// Example: Calculate average temperature per sensor within a 1-minute window
DataStream<Tuple2<String, Double>> averageTemps = readingsWithTimestamps
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AverageTempAggregator());
averageTemps.print();
// Assume SensorReading and AverageTempAggregator classes are defined elsewhere
In this example, assignTimestampsAndWatermarks is crucial. It tells Flink two things:
- When did the event actually occur? (
extractTimestamp) - How far behind can we expect events to arrive? (the
BoundedOutOfOrdernessTimestampExtractorwithTime.seconds(5)). This is the watermark.
The problem Flink solves is dealing with out-of-order events and late arrivals in a distributed, fault-tolerant way. Processing time simply uses the system clock of the machine processing the data, which is susceptible to clock drift and network latency. Ingestion time uses the timestamp when data arrives at Flink, also affected by network latency. Event time, however, aims to represent the true order of events as they happened in the real world, regardless of when Flink sees them.
To do this, Flink uses watermarks. A watermark is a special timestamp that Flink generates and injects into the stream. It signifies that Flink believes all events with timestamps earlier than the watermark have been processed or have arrived. When a window operation (like our TumblingEventTimeWindows.of(Time.minutes(1))) encounters a watermark, it can safely "close" that window and emit results because it has a guarantee (based on the watermark’s value and the BoundedOutOfOrdernessTimestampExtractor’s configuration) that no more relevant events for that window will arrive.
The BoundedOutOfOrdernessTimestampExtractor is a common strategy. It sets the watermark to be current_event_time - max_lateness. So, if the latest event Flink has seen has a timestamp of 10:00:05 and we’ve configured max_lateness to be 5 seconds, Flink will emit a watermark of 10:00:00. This means Flink is confident that any event with a timestamp before 10:00:00 has arrived, or if it hasn’t, it’s considered "late" and might be dropped or handled according to a defined OutputTag for late data.
The core of Flink’s event-time processing lies in its ability to manage state and time independently. It doesn’t just process data as it arrives; it tracks the logical time of the events and uses watermarks to advance this logical time. This allows for accurate windowing (tumbling, sliding, session) and consistent results even when data is delayed or arrives out of order.
What most people don’t realize is that the BoundedOutOfOrdernessTimestampExtractor is just one way to generate watermarks. Flink also supports custom WatermarkStrategy implementations that can be much more sophisticated. For instance, you could implement a strategy that tracks the maximum timestamp across all parallel subtasks of an operator to generate a global watermark, or even one that uses external time sources. The choice of watermark strategy directly impacts how Flink handles late data and the trade-off between latency and completeness.
The next concept you’ll grapple with is how Flink handles data that is late, even after accounting for max_lateness.