The Flink Table API and SQL allow you to treat streaming data as if it were a static table, enabling powerful unified stream and batch processing with familiar SQL syntax.
Let’s look at a simple Flink streaming job that reads from Kafka, filters records, and writes to another Kafka topic.
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
object StreamProcessingExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Define the Kafka source table
val kafkaSourceDDL =
"""
CREATE TABLE KafkaSource (
user_id BIGINT,
event_time TIMESTAMP(3),
event_type STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""".stripMargin
tableEnv.executeSql(kafkaSourceDDL)
// Define the Kafka sink table
val kafkaSinkDDL =
"""
CREATE TABLE KafkaSink (
user_id BIGINT,
event_type STRING
) WITH (
'connector' = 'kafka',
'topic' = 'filtered_events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""".stripMargin
tableEnv.executeSql(kafkaSinkDDL)
// Flink SQL query to filter events
val query =
"""
SELECT user_id, event_type
FROM KafkaSource
WHERE event_type = 'login'
""".stripMargin
// Submit the query
tableEnv.executeSql(query).await()
}
}
In this example, we first set up a StreamExecutionEnvironment and a StreamTableEnvironment. We then define two tables, KafkaSource and KafkaSink, using Flink’s DataStream API’s executeSql method, which internally uses the Table API’s DDL parser. The KafkaSource is configured to read JSON messages from a Kafka topic named user_events, starting from the earliest offset. The KafkaSink is configured to write JSON messages to a Kafka topic named filtered_events.
The core logic is expressed in the SQL query. It selects user_id and event_type from KafkaSource where the event_type is 'login'. This SQL statement is then executed using tableEnv.executeSql(query). Flink translates this SQL query into a DataStream pipeline, handling serialization, deserialization, filtering, and writing to Kafka. The .await() call at the end blocks until the job is completed (or fails).
The power here is that Flink’s Table API and SQL abstract away the complexities of stream processing. You don’t manually manage Kafka consumers, producers, or the stateful operations for complex aggregations or windowing. Flink handles the execution plan generation, optimization, and runtime execution for you. For instance, if you wanted to count login events per user within a 5-minute tumbling window, you’d simply modify the SQL:
SELECT
user_id,
COUNT(*) as login_count,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end
FROM KafkaSource
WHERE event_type = 'login'
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE)
Flink’s optimizer will translate this into an efficient stateful streaming job. The GROUP BY clause with the TUMBLE function defines the windowing strategy. Flink manages the state for each user’s count within each 5-minute window and emits results as windows close.
A common misconception is that Flink’s SQL is only for batch processing. However, Flink’s core strength is its unified API for both batch and streaming. When you define a CREATE TABLE statement with a connector that supports streaming (like Kafka), Flink treats that table as an unbounded stream. The SQL queries then operate on this unbounded stream. The distinction between batch and stream processing becomes a matter of the connector’s capabilities and the query itself (e.g., a query on a bounded file source is a batch job, while a query on Kafka is a streaming job).
When you define a CREATE TABLE statement with a connector, Flink doesn’t just map the columns; it also infers or allows you to specify how data is serialized and deserialized. For formats like JSON, Flink uses a JSON format factory to handle the conversion between your Flink data types and the JSON string. This includes handling nested structures, different data types, and even schema evolution if configured. The WITH clause in the DDL is where you configure all these aspects, including connector-specific properties like properties.bootstrap.servers for Kafka or path for file systems.
The exact mechanism Flink uses to optimize SQL queries involves a multi-stage process. It first parses the SQL into an Abstract Syntax Tree (AST), then converts the AST into a logical plan. This logical plan is then optimized using a series of rewrite rules (e.g., predicate pushdown, projection pruning) to produce an optimized logical plan. Finally, this optimized logical plan is translated into a physical execution plan, which is a DAG of Flink operators. This physical plan is what Flink actually executes, ensuring efficient resource utilization and low latency.
When dealing with time in Flink SQL, especially for windowing operations, understanding event time versus processing time is crucial. By default, many connectors and operations might implicitly use processing time (the time the system sees the event). However, for accurate results in distributed systems where events can be delayed or reordered, you must configure and use event time. This is typically done by defining a WATERMARK clause in your CREATE TABLE statement or within your SQL query, referencing a timestamp column and a strategy for generating watermarks (e.g., FOR EVENT_TIME AS WATERMARK (event_time, INTERVAL '5' SECOND)).
The next concept you’ll likely encounter is managing state for more complex aggregations or joins, and understanding how Flink handles fault tolerance and exactly-once processing guarantees.