Flink’s async I/O is surprisingly good at letting you punch holes in your stream processing to hit external systems without grinding everything to a halt.
Let’s watch Flink fetch user profiles from a PostgreSQL database for incoming clickstream events.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncDataStream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class AsyncDatabaseEnrichment {
// Represents an incoming clickstream event
public static class ClickEvent {
public long userId;
public String page;
public ClickEvent(long userId, String page) {
this.userId = userId;
this.page = page;
}
@Override
public String toString() {
return "ClickEvent{" +
"userId=" + userId +
", page='" + page + '\'' +
'}';
}
}
// Represents enriched event with user profile
public static class EnrichedClickEvent {
public long userId;
public String page;
public String userName;
public String email;
public EnrichedClickEvent(long userId, String page, String userName, String email) {
this.userId = userId;
this.page = page;
this.userName = userName;
this.email = email;
}
@Override
public String toString() {
return "EnrichedClickEvent{" +
"userId=" + userId +
", page='" + page + '\'' +
", userName='" + userName + '\'' +
", email='" + email + '\'' +
'}';
}
}
// Simulate an external database (PostgreSQL)
public static class UserProfile {
public long id;
public String name;
public String email;
public UserProfile(long id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
}
// Mock database for demonstration
private static final java.util.Map<Long, UserProfile> mockDatabase = new java.util.HashMap<>();
static {
mockDatabase.put(101L, new UserProfile(101L, "Alice Smith", "alice.smith@example.com"));
mockDatabase.put(102L, new UserProfile(102L, "Bob Johnson", "bob.j@example.com"));
mockDatabase.put(103L, new UserProfile(103L, "Charlie Brown", "charlie.b@example.com"));
}
// Async function to fetch user profile from the database
public static class UserProfileFetcher implements AsyncFunction<ClickEvent, EnrichedClickEvent> {
// In a real application, this would be a connection pool
private Connection getConnection() throws Exception {
// Using DriverManager for simplicity, use HikariCP or similar in production
Class.forName("org.postgresql.Driver"); // Load the JDBC driver
return DriverManager.getConnection(
"jdbc:postgresql://localhost:5432/mydatabase", // Replace with your DB URL
"myuser", // Replace with your DB username
"mypassword"); // Replace with your DB password
}
@Override
public void asyncInvoke(ClickEvent event, org.apache.flink.util.Collector<EnrichedClickEvent> collector) throws Exception {
// Use CompletableFuture to perform the async operation
CompletableFuture.supplyAsync(() -> {
UserProfile profile = null;
try (Connection conn = getConnection();
PreparedStatement pstmt = conn.prepareStatement("SELECT id, name, email FROM users WHERE id = ?")) {
pstmt.setLong(1, event.userId);
try (ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
profile = new UserProfile(rs.getLong("id"), rs.getString("name"), rs.getString("email"));
}
}
} catch (Exception e) {
// Log the error, Flink will handle retries based on configuration
System.err.println("Error fetching profile for user " + event.userId + ": " + e.getMessage());
// In a real scenario, you might want to return a default or null profile,
// or emit an error event. For simplicity, we'll proceed without enriching.
return null;
}
if (profile != null) {
return new EnrichedClickEvent(event.userId, event.page, profile.name, profile.email);
} else {
// If profile not found, emit an enriched event with nulls or default values
// Or, you might choose to drop the event or emit an error.
return new EnrichedClickEvent(event.userId, event.page, "Unknown", "unknown@example.com");
}
}).thenAccept(enrichedEvent -> {
if (enrichedEvent != null) {
collector.collect(enrichedEvent);
}
});
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. Source: Simulate incoming click events
DataStream<ClickEvent> clickStream = env.addSource(new SourceFunction<ClickEvent>() {
private volatile boolean running = true;
@Override
public void run(SourceContext<ClickEvent> ctx) throws Exception {
int counter = 0;
while (running && counter < 10) {
// Simulate events with different user IDs
long userId = (counter % 3) + 101; // Cycle through user IDs 101, 102, 103
ctx.collect(new ClickEvent(userId, "/page-" + counter));
counter++;
Thread.sleep(100); // Simulate event arrival rate
}
}
@Override
public void cancel() {
running = false;
}
});
// 2. Async I/O: Enrich click events with user profile data
DataStream<EnrichedClickEvent> enrichedStream = AsyncDataStream.orderedWait(
clickStream,
new UserProfileFetcher(),
10000, // Timeout in milliseconds
java.util.concurrent.TimeUnit.MILLISECONDS,
100 // Capacity of the buffer
);
// 3. Sink: Print the enriched events
enrichedStream.print();
env.execute("Async Database Enrichment Example");
}
}
This code sets up a Flink job that takes ClickEvents, looks up their userId in a simulated PostgreSQL database using a custom AsyncFunction, and emits EnrichedClickEvents containing user profile details. The AsyncDataStream.orderedWait function is key here; it allows Flink to send out multiple database requests concurrently without blocking the main stream processing thread. When a response comes back, Flink seamlessly inserts the result into the stream, maintaining order if orderedWait is used.
The core problem this solves is the performance bottleneck when your stream processing logic needs to consult external, stateful systems (like databases, REST APIs, or caches). A naive approach would be to perform blocking I/O within a regular MapFunction or ProcessFunction. This would mean that for every event, the Flink operator would wait for the external system to respond. If the external system is slow, or if many events require lookups, the entire operator, and potentially downstream operators, would be starved of data, leading to low throughput and high latency.
Async I/O allows Flink to decouple the processing of an event from the time it takes to get a response from an external system. When an AsyncFunction is invoked for an event, it initiates an asynchronous operation (like a non-blocking database query or an HTTP request). Instead of waiting, it returns immediately, allowing Flink to process the next event in the stream. The result of the asynchronous operation is then fed back into the Flink stream later, typically via a CompletableFuture that Flink monitors. This dramatically improves throughput because the processing threads aren’t idly waiting for I/O.
The AsyncFunction interface has a single method, asyncInvoke, which takes the input element and a Collector. Inside asyncInvoke, you initiate your asynchronous I/O call. The common pattern is to wrap this call in a CompletableFuture. Once the CompletableFuture completes (either successfully or with an error), you use its thenAccept method to process the result and emit it using the provided Collector.
The AsyncDataStream.orderedWait and AsyncDataStream.unorderedWait are the entry points for applying async I/O. orderedWait guarantees that the output elements are emitted in the same order as the input elements, which is often crucial for maintaining event time semantics or logical processing order. It achieves this by buffering results and only emitting them when all preceding results have also been emitted. unorderedWait simply emits results as soon as they become available, offering higher throughput if order isn’t critical. Both methods take the input stream, the AsyncFunction, a timeout for each asynchronous operation, the time unit for the timeout, and a capacity for the result buffer. The capacity limits how many asynchronous operations can be in flight concurrently.
The UserProfileFetcher in the example shows a typical implementation. It establishes a database connection (in a real app, this would be a connection pool managed outside the asyncInvoke method to avoid repeated connection overhead). It then prepares and executes a SQL query. The result UserProfile is then used to construct an EnrichedClickEvent. This EnrichedClickEvent is then passed to collector.collect(). The CompletableFuture handles the asynchronous execution of the database query, and thenAccept ensures the result is processed by Flink only after the query completes.
A subtle but crucial aspect of async I/O in Flink is how it handles errors and timeouts. The timeout parameter in orderedWait or unorderedWait specifies how long Flink will wait for an individual asynchronous operation to complete. If an operation times out, Flink can be configured to drop the event, emit an error, or retry. In the example, if a profile isn’t found or an error occurs during fetching, a default "Unknown" profile is emitted. This prevents upstream events from being blocked indefinitely, but it’s essential to have a robust error handling strategy that fits your application’s requirements, perhaps by sending problematic events to a dead-letter queue.
The capacity parameter (100 in the example) is critical for tuning. It determines the maximum number of concurrent asynchronous requests Flink will issue. If this capacity is too low, you might not be fully utilizing the external system’s capacity or achieving maximum throughput. If it’s too high, you risk overwhelming the external system or exhausting local resources (like network connections or memory). Careful monitoring and experimentation are needed to find the optimal capacity.
The next challenge you’ll likely face is managing stateful asynchronous operations, like updating counters in an external database after processing an event, and ensuring exactly-once semantics for these updates.