Flink’s Complex Event Processing (CEP) library allows you to detect patterns in event streams, not just individual events.

Let’s say you’re tracking user activity on a website and want to identify users who perform a PageView followed by an AddToCart event within a 5-minute window.

Here’s a simplified Flink CEP job that does this:

// Define the event types
public static class PageViewEvent {
    public String userId;
    public long timestamp;
    // ... other fields
}

public static class AddToCartEvent {
    public String userId;
    public long timestamp;
    // ... other fields
}

// Define the pattern
Pattern<Object, ?> pattern = Pattern.<Object, PageViewEvent>begin("start")
    .where(evt -> evt instanceof PageViewEvent)
    .next("middle")
    .where(evt -> evt instanceof AddToCartEvent)
    .within(Time.minutes(5));

// Create a DataStream of events
DataStream<Object> events = ...; // Assume this is populated

// Apply the CEP pattern
 CEP.pattern(events, pattern)
    .select((Map<String, List<Object>> pattern) -> {
        PageViewEvent pageView = (PageViewEvent) pattern.get("start").get(0);
        AddToCartEvent addToCart = (AddToCartEvent) pattern.get("middle").get(0);
        // Process the matched pattern, e.g., log it or send an alert
        System.out.println("User " + pageView.userId + " viewed and then added to cart within 5 mins.");
        return new Alert(pageView.userId, "Viewed and Added to Cart");
    });

This code defines two event types, PageViewEvent and AddToCartEvent. The Pattern object specifies that we’re looking for a PageViewEvent (named "start") immediately followed by an AddToCartEvent (named "middle"), and this sequence must occur within a 5-minute time window. The select function then defines what to do when a pattern match is found.

The core problem CEP solves is the difficulty of inferring context and state from a continuous, high-volume stream of individual events. Without CEP, you’d be writing complex state management and time-based logic yourself, which is error-prone and hard to scale. CEP provides a declarative way to express these complex temporal relationships.

Internally, Flink CEP builds a state machine for each defined pattern. As events arrive, Flink transitions through the states of these state machines. When an event matches a transition condition, the state machine moves to the next state. If the machine reaches a final state within the defined time constraints, a pattern match is declared. The state is managed by Flink’s fault-tolerant state backends, ensuring correctness even if nodes fail.

The within clause is crucial for defining temporal boundaries. You can specify Time.seconds(30), Time.minutes(1), Time.hours(1), or even combine them like Time.days(1).plus(Time.hours(3)). This defines the maximum time allowed between the first and last event in a matched sequence.

When defining patterns, you can go beyond simple sequences. The followedBy() operator allows for non-strict temporal ordering, meaning events can arrive out of order but still satisfy the pattern as long as they fall within the specified time window. For instance, begin("start").where(...).followedBy("middle").where(...) would match if a PageViewEvent and an AddToCartEvent both occurred within the window, regardless of which came first.

The where() condition is where you define the logic for matching individual events to a specific part of the pattern. This can be a simple type check (as shown above with instanceof) or a more complex predicate checking specific field values. For example, you could require a specific product ID in the AddToCartEvent using .where(evt -> ((AddToCartEvent) evt).productId.equals("XYZ")).

The select function is where you extract the matched events and define your output. The Map<String, List<Object>> passed to select contains lists of events corresponding to the names you gave each stage of your pattern (e.g., "start", "middle"). You can access these lists and process the events. If your pattern has multiple possible matches or alternative paths, the select function can be more complex, potentially returning multiple results or a single aggregated result.

The most surprising thing about Flink CEP is how it handles late-arriving data. By default, if an event arrives after its associated time window has already passed and a pattern match has potentially been made or discarded, it’s simply ignored for that particular pattern instance. However, Flink CEP has a mechanism for allowing late events to be considered, often by configuring how the watermarkStrategy is applied and how time is managed within the CEP engine. This allows for more robust processing in scenarios where network latency or out-of-order delivery is common, preventing valid patterns from being missed simply because of a slight delay.

The next step after detecting patterns is often to trigger actions based on those patterns, which might involve integrating with other Flink operators or external systems.

Want structured learning?

Take the full Flink course →