Flink’s internal metrics give you a look inside each operator, but they don’t tell you how long a single event takes to go from source to sink. Measuring end-to-end latency requires a bit more work, injecting special events and tracking their journey.
Let’s see Flink’s streaming job in action, then we’ll break down how to measure end-to-end latency.
Imagine a simple Flink job that reads from Kafka, performs a simple transformation (like adding a timestamp), and writes to another Kafka topic.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(ProcessingTime); // Or EventTime for more accuracy
// Source: Read from Kafka
Properties consumerConfig = new Properties();
consumerConfig.put("bootstrap.servers", "kafka-broker:9092");
consumerConfig.put("group.id", "flink-latency-tracker");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), consumerConfig);
// Transformation: Add a "processing" timestamp
DataStream<String> sourceStream = env.addSource(kafkaSource);
DataStream<String> transformedStream = sourceStream.map(
value -> {
// In a real scenario, you'd parse the value and add a timestamp
// For simplicity, we'll just return the value with a placeholder
return value + "|PROCESSED_AT=" + System.currentTimeMillis();
}
);
// Sink: Write to Kafka
Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "kafka-broker:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), producerConfig);
transformedStream.addSink(kafkaSink);
env.execute("End-to-End Latency Measurement Example");
Now, how do we measure the time it takes for a message to traverse this pipeline? We need to introduce a mechanism to track events.
The core idea is to inject a special "heartbeat" or "marker" event into your stream at the source. This marker event should carry a timestamp indicating when it was generated. As this marker event flows through your Flink pipeline, each operator that processes it will add its own timestamp or update the event’s metadata. Finally, when the marker event reaches the sink, you can compare its original generation timestamp with the timestamp recorded at the sink to calculate the end-to-end latency for that specific event.
There are several ways to implement this. A common and effective method is to use Flink’s built-in metrics system, but in conjunction with custom event generation and processing.
1. Injecting Latency Markers:
You can modify your source operator to periodically emit a special "latency marker" object. This object would contain:
timestamp: The time it was generated by the source.markerId: A unique identifier for this marker.
// Inside your Flink job, after defining the source:
// A class to hold our latency marker
public static class LatencyMarker implements Serializable {
public long timestamp;
public String markerId;
public LatencyMarker(long timestamp, String markerId) {
this.timestamp = timestamp;
this.markerId = markerId;
}
}
// Modify your source or add a custom source that emits markers
DataStream<String> kafkaDataStream = env.addSource(kafkaSource);
// Create a stream of latency markers
DataStream<LatencyMarker> latencyMarkerStream = env.fromElements() // This needs a periodic generator
.process(new SourceLatencyMarkerGenerator(1000)); // Emit every 1000 ms
// Combine data and marker streams (carefully, using union or similar if types differ)
// For simplicity, let's assume we can transform markers to strings for union
DataStream<Object> combinedStream = kafkaDataStream.union(
latencyMarkerStream.map(marker -> "MARKER:" + marker.markerId + ":" + marker.timestamp)
);
// Your transformation would then need to handle both String data and LatencyMarker objects
// This requires careful type handling, perhaps using a custom POJO or a Union type.
// For this example, let's simplify and assume markers are passed through.
// In a real scenario, you'd use a custom POJO or a Union type.
// Let's assume a simplified approach where markers are passed through as strings for now
DataStream<String> processedStream = combinedStream.map(value -> {
if (value.startsWith("MARKER:")) {
return value; // Pass marker through
} else {
// Process actual data
return value + "|PROCESSED_AT=" + System.currentTimeMillis();
}
});
// The sink needs to distinguish between data and markers
// ... (sink logic to process both)
A more robust way is to create a custom SourceFunction or Source that emits both your actual data and the latency markers.
// Custom Source emitting latency markers
public static class LatencyTrackingSource extends RichSourceFunction<Object> {
private final SourceFunction<String> delegateSource;
private final long markerIntervalMillis;
private volatile boolean running = true;
private transient Map<String, Long> markerTimestamps; // To store marker creation times
public LatencyTrackingSource(SourceFunction<String> delegateSource, long markerIntervalMillis) {
this.delegateSource = delegateSource;
this.markerIntervalMillis = markerIntervalMillis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
delegateSource.open(parameters);
markerTimestamps = new HashMap<>();
}
@Override
public void run(SourceContext<Object> ctx) throws Exception {
long lastMarkerTime = System.currentTimeMillis();
while (running) {
// Emit a latency marker periodically
if (System.currentTimeMillis() - lastMarkerTime >= markerIntervalMillis) {
String markerId = UUID.randomUUID().toString();
long emissionTime = System.currentTimeMillis();
markerTimestamps.put(markerId, emissionTime);
ctx.collect(new LatencyMarker(emissionTime, markerId));
lastMarkerTime = emissionTime;
}
// Emit actual data from the delegate source
// This part needs careful synchronization or a more advanced pattern
// to avoid blocking the marker emission. A common approach is to
// use a shared buffer or a separate thread for the delegate source.
// For simplicity, let's assume we can get one item at a time.
// In reality, you'd use a BlockingQueue or similar.
// delegateSource.run(new SourceContext<String>() { ... }); // This would need to be adapted
// For now, let's just simulate getting a value
String data = fetchDataFromDelegate(); // Replace with actual delegate source read
if (data != null) {
ctx.collect(data);
}
Thread.sleep(10); // Small sleep to prevent busy-waiting
}
}
// Placeholder for fetching data from the actual delegate source
private String fetchDataFromDelegate() throws Exception {
// This needs to be implemented to actually read from the FlinkKafkaConsumer
// or another delegate source. This is a complex part of custom source chaining.
// A common pattern is to have the delegate source run in a separate thread
// and push data into a queue that this source reads from.
return null; // Placeholder
}
@Override
public void cancel() {
running = false;
delegateSource.cancel();
}
}
// In your main job:
// FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), consumerConfig);
// LatencyTrackingSource trackingSource = new LatencyTrackingSource(kafkaSource, 1000); // Emit marker every second
// DataStream<Object> sourceStream = env.addSource(trackingSource);
2. Processing Latency Markers:
As the LatencyMarker object flows through your pipeline, you can add more information. A common technique is to have each operator that processes the marker update a timestamp or add a hop.
// Example of a map operator that might update a marker
DataStream<Object> processedStream = sourceStream.map(value -> {
if (value instanceof LatencyMarker) {
LatencyMarker marker = (LatencyMarker) value;
// Add processing time at this hop
marker.processingTimes.add(System.currentTimeMillis()); // Assuming you add a List<Long> to LatencyMarker
return marker;
} else {
// Process actual data
return value;
}
});
// You would do this for every operator in your pipeline.
// This quickly becomes cumbersome as you need to modify every operator.
A more elegant approach for end-to-end latency is to bypass modifying every operator. Instead, you can use Flink’s metrics system.
3. Using Flink’s Metrics System for Latency Tracking:
You can register custom metrics within your operators to record the timestamps of the latency markers as they pass through.
Modify your LatencyMarker class:
public static class LatencyMarker implements Serializable {
public long timestamp; // Original emission time
public String markerId;
// Add a map to store timestamps at each operator hop
public Map<String, Long> hopTimestamps = new HashMap<>(); // operatorName -> timestamp
}
In your source:
// In your custom source or main job's source setup
// When emitting a marker:
String markerId = UUID.randomUUID().toString();
long emissionTime = System.currentTimeMillis();
LatencyMarker marker = new LatencyMarker(emissionTime, markerId);
marker.hopTimestamps.put("source_operator", emissionTime);
ctx.collect(marker);
In your map operator:
transformedStream = sourceStream.map(value -> {
if (value instanceof LatencyMarker) {
LatencyMarker marker = (LatencyMarker) value;
// Record timestamp when entering this map operator
marker.hopTimestamps.put("map_operator", System.currentTimeMillis());
return marker;
} else {
// Process actual data
return value + "|PROCESSED_AT=" + System.currentTimeMillis();
}
});
In your sink operator:
// Assuming a FlinkKafkaProducer, you might need a custom sink or a wrapper
transformedStream.addSink(new RichSinkFunction<Object>() {
@Override
public void invoke(Object value, Context context) throws Exception {
if (value instanceof LatencyMarker) {
LatencyMarker marker = (LatencyMarker) value;
// Record timestamp when entering this sink operator
marker.hopTimestamps.put("sink_operator", System.currentTimeMillis());
// Calculate end-to-end latency
long sourceTimestamp = marker.hopTimestamps.get("source_operator");
long sinkTimestamp = marker.hopTimestamps.get("sink_operator");
long endToEndLatency = sinkTimestamp - sourceTimestamp;
// Report this latency. You can do this by:
// a) Emitting a new metric to a dedicated metrics sink.
// b) Using Flink's Meter/Histogram metrics.
// c) Writing to a separate Kafka topic for analysis.
// Example: Using Flink's Counter metric (simplistic)
// You'd need to get a RuntimeContext and register a Counter.
// Counter latencyCounter = getRuntimeContext().getMetricGroup()
// .counter("end_to_end_latency_ms");
// latencyCounter.inc(endToEndLatency);
// Example: Logging for immediate visibility
System.out.println("Latency for marker " + marker.markerId + ": " + endToEndLatency + " ms");
} else {
// Process actual data
// ... write to Kafka ...
}
}
});
4. Analyzing the Latency Data:
Once you have the latency values (e.g., logged, sent to a metrics system, or written to a Kafka topic), you can aggregate them.
- Average Latency: Sum of all latencies divided by the count.
- P95/P99 Latency: Useful for understanding worst-case scenarios.
- Latency Distribution: Histograms to visualize how latency varies.
To implement this robustly, you’d typically:
- Use
EventTimefor timestamps if your pipeline uses it. - Ensure your
LatencyMarkerobject is serializable. - Handle potential out-of-order arrival of markers if you’re using complex windowing or stateful operations.
- Consider the overhead of generating and processing these markers. Make the interval long enough to not impact performance significantly, but frequent enough for meaningful insights. A common interval might be every 10-60 seconds.
- Use Flink’s
Histogrammetric for better statistical analysis of latency.
The most challenging part is often correctly chaining the SourceFunctions or ensuring your custom source correctly delegates to the underlying source while also emitting markers without blocking.
The next challenge you’ll likely face is correlating these end-to-end latency measurements with specific data processing steps or resource utilization metrics within Flink.