Flink’s windowing is surprisingly powerful because it doesn’t just group events by time, but also by key, allowing you to perform stateful aggregations on subsets of data that are dynamically defined.
Let’s see this in action. Imagine we have a stream of sensor readings, each with a sensor ID, a timestamp, and a temperature. We want to calculate the average temperature for each sensor over different time intervals.
// Assume 'env' is your StreamExecutionEnvironment
DataStream<SensorReading> readings = env.fromElements(
new SensorReading("sensor-1", 1678886400000L, 25.0), // 2023-03-15 12:00:00
new SensorReading("sensor-2", 1678886460000L, 22.0), // 2023-03-15 12:01:00
new SensorReading("sensor-1", 1678886520000L, 26.0), // 2023-03-15 12:02:00
new SensorReading("sensor-1", 1678886700000L, 27.0), // 2023-03-15 12:05:00
new SensorReading("sensor-2", 1678886760000L, 23.0), // 2023-03-15 12:06:00
new SensorReading("sensor-1", 1678887000000L, 28.0) // 2023-03-15 12:10:00
);
// Key by sensor ID
KeyedStream<SensorReading, String> keyedReadings = readings.keyBy(reading -> reading.getSensorId());
// Tumbling window of 5 minutes
DataStream<Tuple3<String, Long, Double>> tumblingAverages = keyedReadings
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new AvgTempAggregator());
// Sliding window of 10 minutes, sliding every 5 minutes
DataStream<Tuple3<String, Long, Double>> slidingAverages = keyedReadings
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.aggregate(new AvgTempAggregator());
// Session window with a 5-minute gap
DataStream<Tuple3<String, Long, Double>> sessionAverages = keyedReadings
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5))) // Using processing time for simplicity here
.aggregate(new AvgTempAggregator());
// ... and then print or sink the results
tumblingAverages.print("Tumbling Avg");
slidingAverages.print("Sliding Avg");
sessionAverages.print("Session Avg");
// Assuming SensorReading and AvgTempAggregator are defined elsewhere
// SensorReading { String sensorId; long timestamp; double temperature; }
// AvgTempAggregator implements AggregateFunction<SensorReading, Tuple2<Long, Double>, Tuple3<String, Long, Double>>
The core problem Flink windows solve is managing unbounded, out-of-order event streams to perform meaningful aggregations. Instead of processing every single event as it arrives, windows group them into finite, manageable chunks based on time or count. This is crucial for applications like real-time analytics, fraud detection, or IoT data processing where you need to understand patterns over periods rather than just individual events.
Tumbling windows are the simplest. They divide the stream into non-overlapping, fixed-size chunks. If you have a 5-minute tumbling window, events are processed in 5-minute blocks: 0-5 minutes, 5-10 minutes, 10-15 minutes, and so on. Each event belongs to exactly one window. This is great for calculating metrics like total sales per hour or average sensor readings per minute.
Sliding windows are more flexible. They also have a fixed size, but they overlap. A 10-minute sliding window with a slide of 5 minutes means that a window ending at T will cover events from T-10 to T, and the next window ending at T+5 will cover events from T-5 to T+5. This allows you to see metrics over a rolling period, like the average temperature for the last 10 minutes, updated every 5 minutes, capturing trends more granularly.
Session windows are dynamic. They group events based on periods of activity separated by inactivity. If you define a session gap of 5 minutes, a new session starts when an event arrives. If no event arrives for 5 minutes, the current session ends. This is perfect for user activity analysis, where you might want to group user clicks into sessions that are naturally defined by user engagement, rather than arbitrary time boundaries.
When using event-time windows, Flink uses a Watermark to track the progress of event time. Watermarks are special timestamps embedded in the stream that tell Flink how late events might be. When a watermark arrives, Flink knows that all events with timestamps earlier than the watermark have likely arrived, and it can trigger window computations. For example, if a watermark of 1678886500000L (representing 2023-03-15 12:01:40) arrives, any tumbling window that ended before this time (e.g., the 12:00:00 - 12:05:00 window) is ready for processing.
The AggregateFunction is Flink’s highly efficient way to perform stateful computations within a window. It maintains an accumulator (e.g., a count and sum of temperatures) and updates it incrementally as new elements arrive. It then produces a final result when the window is closed. This avoids recomputing aggregates from scratch for every window.
What most people don’t realize is that Flink’s windowing is not limited to event time. You can also use processing time windows, where the system clock determines window boundaries. While simpler to set up, processing time is susceptible to system clock drift and machine load, making event time the preferred choice for most accurate and reproducible results in distributed streaming.
The next concept to explore is how Flink handles late data that arrives after a window has already been triggered and processed.