Flink’s temporal and interval joins aren’t just about matching events; they’re about orchestrating time itself, allowing you to precisely align data based on its event time, not just when it arrives.

Let’s see this in action. Imagine you’re tracking user clicks on a website and purchase events. You want to know which clicks led to a purchase within a specific time window.

Here’s a simplified Flink setup:

// Define the stream of clicks
DataStream<ClickEvent> clicks = env.fromCollection(clickData)
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.seconds(5)) {
        @Override
        public long extractTimestamp(ClickEvent element) {
            return element.getTimestamp();
        }
    });

// Define the stream of purchases
DataStream<PurchaseEvent> purchases = env.fromCollection(purchaseData)
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PurchaseEvent>(Time.seconds(5)) {
        @Override
        public long extractTimestamp(PurchaseEvent element) {
            return element.getTimestamp();
        }
    });

// Temporal Join: Match a purchase to the *most recent* click for the same user *before* the purchase
DataStream<UserActivity> userActivity = clicks.keyBy(ClickEvent::getUserId)
    .connect(purchases.keyBy(PurchaseEvent::getUserId))
    .process(new CoProcessFunction<ClickEvent, PurchaseEvent, UserActivity>() {
        private ValueState<ClickEvent> lastClickState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<ClickEvent> lastClickDescriptor =
                new ValueStateDescriptor<>("lastClick", ClickEvent.class);
            lastClickState = getRuntimeContext().getState(lastClickDescriptor);
        }

        @Override
        public void processElement1(ClickEvent click, CoProcessFunction<ClickEvent, PurchaseEvent, UserActivity>.Context ctx, Collector<UserActivity> out) throws Exception {
            // Store the latest click for this user
            lastClickState.update(click);
            // Optionally, schedule a timer to clean up old state if needed
        }

        @Override
        public void processElement2(PurchaseEvent purchase, CoProcessFunction<ClickEvent, PurchaseEvent, UserActivity>.Context ctx, Collector<UserActivity> out) throws Exception {
            ClickEvent lastClick = lastClickState.value();
            if (lastClick != null && lastClick.getUserId().equals(purchase.getUserId())) {
                // Found a matching click within the implicit temporal window
                out.collect(new UserActivity(purchase.getUserId(), lastClick.getTimestamp(), purchase.getTimestamp()));
            }
            // Clean up state if it's too old to be relevant for future purchases
            ctx.timerService().deleteStateTtlTimer(ctx.timerService().currentProcessingTime()); // Example, actual cleanup is more complex
        }

        // Timers for state cleanup would go here
    });

// Interval Join: Match clicks to purchases that happen within a *specific interval* after the click
DataStream<ClickPurchaseMatch> clickPurchaseInterval = clicks.keyBy(ClickEvent::getUserId)
    .intervalJoin(
        purchases.keyBy(PurchaseEvent::getUserId)
    )
    .between(Time.seconds(0), Time.minutes(5)) // Purchases within 0 to 5 minutes after the click
    .process(new IntervalJoinProcessFunction<ClickEvent, PurchaseEvent, ClickPurchaseMatch>() {
        @Override
        protected ClickPurchaseMatch processMatch(ClickEvent click, PurchaseEvent purchase) {
            return new ClickPurchaseMatch(click.getUserId(), click.getTimestamp(), purchase.getTimestamp());
        }
    });

userActivity.print("User Activity");
clickPurchaseInterval.print("Click Purchase Interval");

The core problem these joins solve is correlating events that happen asynchronously and are ordered by their logical time (event time), not their arrival time. Traditional joins would struggle with this as they typically rely on exact matches or simple windowing that might miss nuanced temporal relationships.

The temporal join (implemented here using connect and CoProcessFunction with ValueState) works by maintaining the state of one stream (e.g., the latest click for a user) and then, when an event from the other stream arrives (e.g., a purchase), it queries that state. This is powerful because it allows you to join against a historical view of the other stream, specifically finding the most recent or first event that satisfies a condition before the current event. The ValueState here acts as a mini-database for each key, holding the relevant historical event.

The interval join (intervalJoin) is more about defining a window of opportunity for a match. For each event in the left stream (clicks), it looks for events in the right stream (purchases) that fall within a specified time range relative to the left event’s timestamp. The between(Time.seconds(0), Time.minutes(5)) clause is the key here: it tells Flink to consider purchases that occurred between 0 seconds and 5 minutes after the click’s event time. Flink’s internal mechanism manages timers and state to efficiently identify these pairs without you having to manually manage windowing logic.

When you look at the processElement2 in the temporal join example, you’ll notice the lastClickState.value(). This is where the magic happens: Flink has ensured that lastClickState holds the click event that occurred most recently before the purchase event for that specific userId, based on event time. It’s not just any click; it’s the one that best fits a "preceding event" pattern.

The most surprising aspect of these joins is how Flink handles state and time. It’s not just about buffering data; it’s about actively managing historical event states and using sophisticated timer services to ensure that matches are made based on event time, even when data arrives out of order or with significant delays. The BoundedOutOfOrdernessTimestampExtractor is crucial; it tells Flink how much delay it can tolerate before considering an event "late" and potentially discarding it or affecting its timestamp.

The next concept you’ll need to grapple with is handling state expiration and cleanup. Without it, your ValueState in the temporal join, or the internal state managed by intervalJoin, could grow indefinitely, leading to out-of-memory errors or performance degradation. You’ll need to implement strategies, often involving timers, to proactively remove old state that is no longer relevant for future joins.

Want structured learning?

Take the full Flink course →