Flink Stateful Functions are a way to build stateful microservices. They let you manage state for individual entities (like users, devices, or sessions) directly within your Flink application, rather than relying on external databases for every little piece of data. This can make your applications much faster and simpler.
Here’s a quick look at how it works. Imagine you have a service that tracks user sessions. Each user is an "entity," and their session data (like login time, last activity) is their "state."
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.StatefulFunctions;
import org.apache.flink.statefun.sdk.annotations.StatefulFunctionType;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.io.StatefulFunctionRouter;
import com.google.protobuf.Any;
import java.util.concurrent.CompletableFuture;
// Define the function type
@StatefulFunctionType("my_company/session_tracker")
public class SessionTrackerFunction implements StatefulFunction {
// Define the state that this function will manage
private static final ValueStateSpec<Long> LAST_SEEN_SPEC =
ValueStateSpec.of("lastSeen", Long.class);
@Override
public void invoke(Context context, Any message) throws Exception {
// This is where your logic goes.
// You'll receive messages and use the context to read/write state.
if (message.is(UserLoginProto.LoginRequest.class)) {
UserLoginProto.LoginRequest request = message.unpack(UserLoginProto.LoginRequest.class);
long currentTime = System.currentTimeMillis();
// Read the current state
CompletableFuture<Long> lastSeenFuture = context.getState(LAST_SEEN_SPEC);
lastSeenFuture.thenAccept(lastSeen -> {
System.out.println("User " + request.getUserId() + " logged in. Last seen: " + (lastSeen != null ? lastSeen : "never"));
// Write the new state
context.setState(LAST_SEEN_SPEC, currentTime);
// Optionally, send a reply or trigger other functions
// context.reply(response);
});
}
// Handle other message types...
}
}
In this example, SessionTrackerFunction is a Stateful Function. It has a piece of state called lastSeen that stores a Long (timestamp). When a LoginRequest message arrives, it reads the current lastSeen value, prints it, and then updates lastSeen with the current time. The Context object is your gateway to interacting with the state and sending messages.
The core problem Stateful Functions solve is the impedance mismatch between stateless computation (like traditional microservices or Flink batch jobs) and the reality of many business domains, which are inherently stateful. Think about a shopping cart: you need to know what’s in the cart for each user. Or a device that needs to maintain its current status. External databases can work, but they introduce latency and complexity. Stateful Functions bring this state management inside your Flink pipeline, co-located with the computation that needs it.
This co-location is key. When a message arrives for a specific entity (e.g., a LoginRequest for user_123), Flink ensures that the SessionTrackerFunction instance responsible for user_123 is invoked. Crucially, Flink guarantees that the state for user_123 is loaded and available before your function code runs and saved after your function code finishes. This is all managed by Flink’s fault-tolerant state backends (like RocksDB).
The "stateful" part means Flink manages the lifecycle of this state for you. If your Flink job restarts, the state is recovered. If you scale your Flink cluster, Flink rebalances the state. You don’t have to write explicit "savepoint" or "restore" logic for your entity states; Flink handles it.
The "functions" part refers to the fact that you define individual functions, each responsible for a specific type of entity and the operations it can perform. These functions are then composed into a larger Flink application. You can have multiple StatefulFunctionSpecs registered in a StatefulFunctions object, allowing your application to handle different types of entities and messages.
The system is designed around a few core concepts:
- Function Type: A unique identifier for a specific kind of stateful function (e.g.,
my_company/session_tracker). - State: The data associated with an individual entity. Stateful Functions support various state types:
ValueState,ListState,MapState, etc. - Context: Provides access to state, allows sending messages to other functions (synchronously or asynchronously), and handles replies.
- Messages: Typically Protobufs or other serializable objects that carry data between functions. Flink Stateful Functions uses
com.google.protobuf.Anyfor flexible message typing. - Stateful Function Provider: A factory that tells Flink how to create instances of your
StatefulFunction.
The real magic happens when you consider how Flink manages state consistency and fault tolerance. When you call context.setState(SPEC, value), Flink doesn’t just write it to disk immediately. It’s part of Flink’s distributed snapshotting mechanism. During a snapshot, Flink pauses the application, ensures all in-flight state updates are written to a durable backend (like RocksDB), and then checkpoints the metadata. This means your state is consistent and recoverable even in the face of machine failures or planned restarts.
One thing most people don’t realize is that Flink Stateful Functions are fundamentally a way to build distributed actors with first-class state management, but within the Flink ecosystem. The "entity ID" you provide when sending a message to a function (context.send(targetFunctionType, entityId, message)) acts as the actor address. Flink then routes the message to the correct function instance based on that entity ID, ensuring that all messages for a given entity are processed in order by the same function instance. This ordering guarantee is crucial for many stateful applications.
The next step after mastering basic stateful functions is often integrating them with Flink’s powerful stream processing capabilities, like using Kafka as an ingress/egress for your stateful functions.