The most surprising thing about event aggregation is that it’s often not about collecting events, but about discarding them until you have something meaningful.
Let’s say you’re tracking user activity across a web app, a mobile app, and a backend service. Each of these generates a stream of events: user_logged_in, page_viewed, item_added_to_cart, payment_processed. Individually, these are noisy. A page_viewed event from the mobile app might be followed by another within milliseconds as the user scrolls. You don’t want to process every single one.
Here’s what that looks like in practice. Imagine a simplified stream of incoming events:
{"event_id": "evt_abc", "timestamp": 1678886400, "user_id": "user_123", "type": "page_viewed", "details": {"page": "/home"}}
{"event_id": "evt_def", "timestamp": 1678886401, "user_id": "user_123", "type": "page_viewed", "details": {"page": "/products"}}
{"event_id": "evt_ghi", "timestamp": 1678886405, "user_id": "user_123", "type": "item_added_to_cart", "details": {"item_id": "prod_xyz"}}
{"event_id": "evt_jkl", "timestamp": 1678886410, "user_id": "user_123", "type": "page_viewed", "details": {"page": "/cart"}}
{"event_id": "evt_mno", "timestamp": 1678886412, "user_id": "user_123", "type": "payment_processed", "details": {"order_id": "ord_789"}}
The aggregation pattern aims to transform this into something more actionable. Instead of processing each page_viewed, you might want to capture a "session" event that represents a contiguous block of user activity.
To do this, you need a system that can:
- Ingest events from multiple sources: This typically involves a message queue like Kafka, RabbitMQ, or AWS SQS, where each source publishes its events to a designated topic or queue.
- Identify related events: Events are related if they come from the same user, within a certain time window, and often for specific event types. A common identifier is the
user_id. - Define aggregation logic: This is the core. You decide what constitutes an aggregated event. For a user session, it might be: "If a
user_logged_inevent occurs, start a session. If no activity for 30 minutes, end the session and emit asession_endedevent." Or, "If anitem_added_to_cartevent occurs and is followed by apayment_processedevent within 5 minutes, emit apurchase_completedevent." - Buffer and process: Events are buffered temporarily, typically keyed by
user_idor session ID, allowing the system to inspect incoming events in the context of recent activity. - Emit aggregated events: Once the aggregation logic is met (e.g., a timeout, a specific event sequence), a new, coarser-grained event is produced.
Consider a Kafka stream processing setup using Flink or Spark Streaming. You’d consume from topics like web_events, mobile_events, backend_events. You’d then use a groupBy(user_id) operation and apply a window function.
For sessionization, a common approach is a session window with a maximum gap of, say, 30 minutes.
// Example Flink pseudo-code
DataStream<Event> events = kafkaConsumer.assignTimestampsAndWatermarks(...)
.filter(event -> event.getUserId() != null); // Ensure we have a user ID
KeyedStream<String, Event> keyedEvents = events.keyBy(Event::getUserId);
WindowedStream<Event, String, SessionWindow> sessionWindows = keyedEvents
.window(SessionWindow.withGap(Time.minutes(30))); // 30-minute inactivity gap
DataStream<AggregatedEvent> aggregatedEvents = sessionWindows
.process(new SessionProcessFunction()); // Custom logic to build session events
The SessionProcessFunction would track the start time, end time, and list of events within each session window. When the window closes (due to inactivity or event arrival after the gap), it emits a session_ended event containing a summary.
The critical insight is that aggregation often involves state. The system needs to remember, for each user_id, when their last event was, what the current session’s start time is, and perhaps a small buffer of recent events. This state is maintained across incoming events.
When you see an item_added_to_cart event for user_123, the system checks its internal state for user_123. It sees that a session started 5 minutes ago. It updates the session’s last activity timestamp and adds the item_added_to_cart event to its in-memory representation of the session. If a payment_processed event for user_123 arrives 2 minutes later, the system again checks its state. It sees the item_added_to_cart event is already recorded for this session. It then emits a purchase_completed event. If no event arrives for user_123 for 30 minutes, the session window closes, and a session_ended event is emitted, even if no purchase occurred.
The most complex part is handling late-arriving events and ensuring exactly-once processing semantics, especially when state is involved. Most modern stream processing frameworks provide mechanisms for this, but understanding how they manage checkpoints and fault tolerance is crucial for building reliable aggregators.
The next pattern you’ll likely encounter when dealing with aggregated events is the Saga Pattern, which helps manage distributed transactions that are often initiated by these higher-level, aggregated events.