Flink’s side outputs let a single stream processing job send data to multiple distinct destinations, and it’s far more powerful than just splitting a stream.
Let’s see it in action. Imagine we have a stream of user events, and we want to process them differently based on their type. Some are LoginEvents, some are ClickEvents, and others are PurchaseEvents. We want to send LoginEvents to one output, ClickEvents to another, and all other events to a general "other" output.
Here’s a simplified Flink job that does this:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.OutputTag;
// Assume these are your event types
class Event {}
class LoginEvent extends Event { public String userId; }
class ClickEvent extends Event { public String userId; public String page; }
class PurchaseEvent extends Event { public String userId; public double amount; }
public class SideOutputExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Simulate an input stream of events
DataStream<Event> inputStream = env.fromElements(
new LoginEvent() {{ userId = "user1"; }},
new ClickEvent() {{ userId = "user2"; page = "/home"; }},
new PurchaseEvent() {{ userId = "user1"; amount = 10.5; }},
new LoginEvent() {{ userId = "user3"; }},
new Event() // Some other generic event
);
// Define output tags for different event types
OutputTag<LoginEvent> loginOutputTag = new OutputTag<LoginEvent>("login-events") {};
OutputTag<ClickEvent> clickOutputTag = new OutputTag<ClickEvent>("click-events") {};
// Process the stream and emit to side outputs
SingleOutputStreamOperator<Event> mainStream = inputStream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context context, Collector<Event> collector) throws Exception {
if (event instanceof LoginEvent) {
context.output(loginOutputTag, (LoginEvent) event);
} else if (event instanceof ClickEvent) {
context.output(clickOutputTag, (ClickEvent) event);
} else {
// Emit all other events to the main output (or another side output)
// For simplicity, let's emit them to a third implicit side output if needed,
// or just let them pass through to the main stream if no further processing is needed there.
// Here, we'll just let them pass through to the main stream for demonstration.
collector.collect(event);
}
}
});
// Get the side output streams
DataStream<LoginEvent> loginStream = mainStream.getSideOutput(loginOutputTag);
DataStream<ClickEvent> clickStream = mainStream.getSideOutput(clickOutputTag);
// Now you can process these streams independently
loginStream.print("Logins");
clickStream.print("Clicks");
// For events that were not explicitly sent to side outputs, they remain in the mainStream
// mainStream.print("Others"); // This would print the PurchaseEvent and the generic Event
env.execute("Side Output Example");
}
}
In this example, the process function inspects each incoming Event. If it’s a LoginEvent, it’s sent to the loginOutputTag. If it’s a ClickEvent, it goes to clickOutputTag. Any other event is collected by the main collector, effectively passing through to the main stream. We then retrieve these distinct streams using getSideOutput() and can apply separate operations to them.
The real power here is that all this happens within a single Flink task. Instead of having three separate Flink jobs, each reading from the same source and filtering, we have one job that reads once and intelligently routes. This drastically reduces resource overhead and simplifies management. You can chain multiple process functions, each with its own set of side outputs, creating complex routing logic within a single operator.
The mental model to build is one of a single data processing node that has multiple "spouts" or "outlets" it can push data to, based on conditional logic within that node. Each OutputTag is essentially a named channel. The ProcessFunction acts as the router, deciding which channel each incoming record should go down. The getSideOutput() calls are how you subscribe to those named channels downstream.
One thing most people don’t know is that you can have multiple process functions in a chain, and each can emit to its own set of side outputs. For instance, the output of a process function that emits to sideOutputA and sideOutputB could then be fed into another process function that, based on some condition, emits to sideOutputC and sideOutputD. This allows for incredibly granular and hierarchical routing within a single operator chain without ever hitting the network to send data between separate tasks or jobs. The routing and splitting happen entirely within the memory of the operator instance.
The next concept to explore is how to handle state when using side outputs, especially when keys are involved.