Flink’s Queryable State lets you poke around inside a running Flink job’s state without stopping it, which is way cooler than you’re probably imagining.
Let’s see it in action. Imagine a simple Flink job that counts words from an input stream.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.ExecutionEnvironment;
import org.apache.flink.util.Collector;
public class QueryableStateExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable Queryable State
env.enableCheckpointing(1000); // Checkpointing is required for Queryable State
env.getConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);
env.getConfig().enableObjectReuse(); // Good practice for performance
DataStream<String> text = env.fromElements(
"hello world",
"flink is awesome",
"queryable state is cool",
"hello flink",
"world is great"
);
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.keyBy(0)
.reduce((count1, count2) -> new Tuple2<>(count1.f0, count1.f1 + count2.f1));
// Register the state for Queryable State access
// The key is the word, the value is the count
wordCounts.getSideOutput(new org.apache.flink.streaming.api.watermark.WatermarkStrategy<Tuple2<String, Integer>>().forMonotonousTimestamps())
.addSink(new org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
// This is where you'd typically do something with the counts,
// but for Queryable State, the state is managed internally by Flink.
// The key is that the state *exists* and is accessible.
}
});
// To make the state accessible, we need to register it.
// This is a bit of a workaround for demonstration; in real apps,
// you'd typically use a stateful operator directly.
// The key is that the keyBy and reduce operations create state.
// Flink automatically makes keyed state queryable if configured.
// We don't explicitly need to 'register' it in the same way as older Flink versions.
// The configuration in flink-conf.yaml is what enables it.
env.execute("Queryable State Example");
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (!token.isEmpty()) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
To actually query this, you need to configure Flink. In your flink-conf.yaml:
# Enable Queryable State
queryable-state.enable: true
# The network port for Queryable State RPC service
queryable-state.port: 6124
Now, after this job is running, you can use Flink’s client or a custom application to query the state. For instance, to get the count for the word "flink":
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.StateRestClient;
import org.apache.flink.queryablestate.client.StateResponse;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
public class QueryStateClient {
public static void main(String[] args) throws Exception {
// Replace with your JobManager's hostname and Queryable State port
String jobManagerHost = "localhost";
int queryableStatePort = 6124;
String jobManagerAkkaURL = "akka.tcp://flink@" + jobManagerHost + ":6123"; // Default Akka port
StateRestClient restClient = new StateRestClient(jobManagerHost, queryableStatePort, jobManagerAkkaURL);
// The key of the state we want to query (e.g., the word "flink")
String keyToQuery = "flink";
// The name of the state descriptor used in the Flink job
// This MUST match the name given when registering the state in Flink
// For keyed stream operations like `keyBy().reduce()`, Flink uses internal names.
// For simplicity in this example, let's assume a simple ValueState named "wordCountState"
// In a real scenario, you'd need to know the exact state name Flink uses or explicitly name it.
// For keyed streams, the state name is often derived from the operator and field name.
// A common approach is to explicitly name the state:
// ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("myWordCountState", Types.INT);
// KeyedStream<String, String> keyedStream = ...
// keyedStream.map(...).state(descriptor);
// If not explicitly named, it can be tricky. Let's proceed assuming we know it or it's named.
// For the example above, the state is implicitly managed by the keyBy().reduce().
// The state is associated with the key ("flink").
// We need to know the job ID. Let's assume it's running and we have it.
// You can get the job ID from the Flink UI or via Flink's REST API.
// For this example, let's hardcode a placeholder. Replace with your actual Job ID.
String jobId = "your_running_job_id"; // <<< REPLACE THIS
try {
// The type of the state value
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("wordCountState", Types.INT); // Assuming this name
// Query the state for the specific key
CompletableFuture<StateResponse<String, Integer>> stateFuture = restClient.queryState(
jobId,
"operator_id_of_your_keyed_stream", // <<< REPLACE THIS with the actual operator ID
keyToQuery,
descriptor
);
StateResponse<String, Integer> response = stateFuture.get(); // Blocking call for simplicity
if (response.isSuccess()) {
System.out.println("State for key '" + keyToQuery + "': " + response.getValue());
} else {
System.err.println("Failed to query state: " + response.getErrorMessage());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
restClient.shutdown();
}
}
}
The mental model for Queryable State is that Flink maintains state per key within its task managers. When you enable Queryable State, Flink exposes an RPC service on each task manager (and the JobManager, which acts as a proxy) that allows external clients to request specific pieces of this state. The client sends a request specifying the job ID, the operator that owns the state, the key you’re interested in, and the type of state (e.g., ValueState, ListState). Flink then routes this request to the correct task manager holding that key’s state and returns the value.
The core problem Queryable State solves is the need to inspect or even modify the internal state of a Flink job without interrupting its execution. This is invaluable for debugging, monitoring, and even for implementing external control loops that react to Flink’s current state. Instead of relying on side outputs or logging (which can be slow and asynchronous), you get direct access to the state as it exists at a particular point in time, usually tied to a checkpoint.
The exact levers you control are primarily through flink-conf.yaml: queryable-state.enable to turn it on, and queryable-state.port to set the listening port. Within your Flink job code, the crucial part is ensuring your stateful operators are correctly configured and that you know the exact StateDescriptor name and the operator ID you’re targeting in your client application. The operator ID is often found in the Flink UI under the "Graph" tab, by inspecting the details of a specific operator.
What most people don’t realize is that Queryable State is inherently asynchronous and can be a point of performance degradation if overused. The RPC calls to query state introduce network latency and require Flink’s internal state access mechanisms to be invoked. If you’re querying state very frequently from many clients, you can put significant load on the JobManager and TaskManagers. Furthermore, the state you retrieve is a snapshot as of the last completed checkpoint or savepoint, meaning it might not reflect the absolute latest events processed by the job if checkpoints are infrequent.
The next concept you’ll likely run into is how to handle different types of Flink state (like MapState or ListState) when querying, and how to manage state access across multiple TaskManagers efficiently.