Flink’s temporal table joins let you enrich one stream with data from another, but the real magic is in how they handle time.
Imagine you’re processing a stream of e-commerce orders. Each order has a timestamp, but to understand its true value, you need to know the price of the product at the time the order was placed. Product prices change, so a simple join against the current product catalog won’t cut it. You need to join against a historical view of the product catalog.
Here’s a simplified Flink job that does just that. We’ll have two streams: OrderStream and ProductCatalogStream. The ProductCatalogStream will be defined as a temporal table.
// Define Order schema
DataStream<Order> orderStream = env.fromCollection(
Arrays.asList(
new Order(1L, "apple", 100, Instant.parse("2023-01-01T10:00:00Z")),
new Order(2L, "banana", 200, Instant.parse("2023-01-01T10:05:00Z")),
new Order(3L, "apple", 150, Instant.parse("2023-01-01T10:10:00Z"))
)
);
// Define Product Catalog schema
DataStream<ProductCatalog> productCatalogStream = env.fromCollection(
Arrays.asList(
new ProductCatalog("apple", 100, Instant.parse("2023-01-01T09:00:00Z")), // Price at 9 AM
new ProductCatalog("banana", 200, Instant.parse("2023-01-01T09:30:00Z")), // Price at 9:30 AM
new ProductCatalog("apple", 120, Instant.parse("2023-01-01T10:05:00Z")) // Price updated at 10:05 AM
)
);
// Create a Temporal Table from the Product Catalog stream
Table productCatalogTable = tableEnv.fromDataStream(
productCatalogStream,
$("productId"),
$("price"),
$("eventTime").rowtime() // Crucial: Use event time for temporal joins
);
// Register the temporal table
tableEnv.createTemporalTableFunction(productCatalogTable, $("eventTime")); // Specify the time attribute
// Define the join condition
Table enrichedOrders = tableEnv.fromDataStream(orderStream, $("orderId"), $("productId"), $("amount"), $("orderTime").rowtime())
.joinLateral(
call(
"getProductCatalog", // Name of the temporal table function
$("productId")
),
$("orderTime") >= $("validFrom") && $("orderTime") < $("validUntil") // Standard temporal join condition
)
.select(
$("orderId"),
$("productId"),
$("amount"),
$("orderTime"),
$("price").as("productPrice") // The price from the catalog at the order time
);
// Execute the query
enrichedOrders.execute().print();
In this example, productCatalogStream is processed into a Table and then registered as a temporal table function named getProductCatalog. The joinLateral clause then uses this function. For each Order, it looks up the corresponding ProductCatalog entry where the orderTime falls within the validity period (validFrom and validUntil) of the product catalog entry. Flink automatically infers these validity periods based on the eventTime of the ProductCatalog records. The orderTime in the Order stream acts as the query time.
The most surprising part of temporal table joins is how Flink manages the "history" of the temporal table. It doesn’t just store the latest version of each key. Instead, it maintains a view of the temporal table as it existed at any point in time defined by the eventTime of the incoming data. When a new record arrives for a key in the temporal table, Flink doesn’t overwrite the old one. It effectively closes the validity period of the previous record and opens a new one for the new record. This is why the eventTime attribute is critical and must be a ROWTIME attribute.
The joinLateral syntax is key here. It allows us to call the temporal table function for each row in the main stream. The condition $("orderTime") >= $("validFrom") && $("orderTime") < $("validUntil") is the standard pattern for temporal joins, where Flink automatically provides validFrom and validUntil based on the event times of the temporal table’s history.
The final select statement pulls columns from both the original orderStream and the enriched data from the ProductCatalog (specifically, the price at the time of the order).
After enriching your streams, you’ll likely want to perform aggregations or further transformations based on the enriched data, such as calculating the total order value considering the product price at the time of purchase.