The Flink SQL Gateway acts as a RESTful interface to Flink’s SQL capabilities, allowing you to submit and manage SQL queries without needing direct access to Flink’s internal APIs or CLI.
Let’s see it in action. Imagine you have a Flink cluster running and the SQL Gateway is enabled and accessible on port 8080. You want to query a Kafka topic named my_kafka_topic that contains JSON data.
First, you’d need to define a Flink SQL table that maps to this Kafka topic. This typically involves creating a CREATE TABLE statement. You’d send this statement as part of a JSON payload to the SQL Gateway’s /sql endpoint.
{
"operation": "EXECUTE_STATEMENT",
"execution_environment": "default",
"statements": [
"CREATE TABLE KafkaTopic (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" event_time TIMESTAMP(3),\n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'my_kafka_topic',\n" +
" 'properties.bootstrap.servers' = 'kafka-broker-1:9092,kafka-broker-2:9092',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false'\n" +
");"
]
}
You’d send this via POST to http://localhost:8080/api/v1/namespaces/default/sql. The namespaces part is important; it’s how Flink SQL isolates different SQL environments.
Once the table is defined, you can submit a query. Let’s say you want to count events per user.
{
"operation": "EXECUTE_STATEMENT",
"execution_environment": "default",
"statements": [
"SELECT user_id, COUNT(*) FROM KafkaTopic GROUP BY user_id;"
]
}
This JSON payload, sent to the same /api/v1/namespaces/default/sql endpoint, will initiate the query execution. The SQL Gateway will return a operationId. You then use this operationId to poll another endpoint, typically /api/v1/namespaces/default/operations/<operationId>, to check the status of your query (e.g., RUNNING, SUCCEEDED, FAILED). If it succeeds, the results will be available.
The core problem the SQL Gateway solves is decoupling SQL query submission and management from the Flink cluster’s internal RPC or web UI. It provides a standardized, stateless REST API that makes it easy for external applications, CI/CD pipelines, or even simple scripts to interact with Flink SQL. This abstraction means you don’t need to worry about Flink’s JobManager/TaskManager communication protocols or how to find and submit jobs directly. The Gateway handles the translation of your SQL statements into Flink jobs and provides a unified way to monitor their lifecycle.
You control the execution environment (default in these examples), which maps to specific Flink SQL configuration. Within a namespace, you can have multiple such environments, each with its own set of cataloged tables and configurations. The operation field is key; it dictates whether you’re creating tables, running queries, or fetching results.
The most surprising thing is how the SQL Gateway manages state for long-running queries. While the REST API itself is stateless, the Gateway maintains an internal mapping between operationIds and the actual Flink jobs it spawned. When you poll for status or results, it’s querying Flink’s runtime for that specific job’s state, effectively acting as a proxy. This allows you to manage query execution and retrieve results asynchronously, which is crucial for interactive querying or building applications on top of Flink SQL.
The next step is to explore how to manage operationIds for long-running streaming queries and retrieve results iteratively.