The most surprising thing about tracing asynchronous Kafka and SQS messages is how much of the "asynchronous" nature is an illusion when you’re trying to get end-to-end visibility.
Let’s say you have a microservice, order-processor, that consumes messages from an SQS queue. When it gets a message, it processes it and then publishes an event to a Kafka topic.
Here’s order-processor’s Python code, instrumented with Elastic APM:
from elasticapm.contrib.flask import ElasticAPM
from flask import Flask
import json
import boto3
import kafka
app = Flask(__name__)
app.config['ELASTIC_APM'] = {
'SERVICE_NAME': 'order-processor',
'SERVER_URL': 'http://localhost:8200',
'ENVIRONMENT': 'development',
}
apm = ElasticAPM(app)
sqs = boto3.client('sqs', region_name='us-east-1')
kafka_producer = kafka.KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
api_version=(0, 10, 1)
)
QUEUE_URL = 'http://localhost:4566/000000000000/my-order-queue'
KAFKA_TOPIC = 'order_events'
def process_order(order_data):
print(f"Processing order: {order_data['order_id']}")
# Simulate some processing
order_data['status'] = 'processed'
return order_data
while True:
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
if 'Messages' in response:
for message in response['Messages']:
with apm.capture_span('SQS Message Processing', span_type='messaging'):
order_data = json.loads(message['Body'])
# Inject trace context from SQS message attributes if available
trace_context = {}
if 'MessageAttributes' in message:
for key, value in message['MessageAttributes'].items():
if key.startswith('trace_'):
trace_context[key.replace('trace_', '')] = value['StringValue']
if trace_context:
apm.import_trace_context(trace_context)
processed_order = process_order(order_data)
# Publish to Kafka, propagating trace context
with apm.capture_span('Kafka Publish', span_type='messaging'):
kafka_producer.send(KAFKA_TOPIC, value=processed_order,
headers=[
('traceparent', apm.get_trace_parent().serialize().encode('utf-8')),
('tracestate', apm.get_trace_state().serialize().encode('utf-8')),
])
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No messages received.")
Now, imagine a producer sending an order to SQS.
# producer.py (simplified)
import json
import boto3
from elasticapm.utils.disttracing import TraceParent
sqs = boto3.client('sqs', region_name='us-east-1')
QUEUE_URL = 'http://localhost:4566/000000000000/my-order-queue'
# Start a new trace
trace_parent = TraceParent(trace_id='a' * 32, span_id='b' * 16, sample_rate=1.0)
trace_context_attrs = {
'trace_parent': trace_parent.trace_id,
'trace_id': trace_parent.trace_id,
'span_id': trace_parent.span_id,
'trace_state': trace_parent.trace_state,
}
order_payload = {
"order_id": "ORD12345",
"item": "widget",
"quantity": 2
}
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(order_payload),
MessageAttributes={
'trace_parent': {'StringValue': trace_parent.serialize(), 'DataType': 'String'},
'trace_id': {'StringValue': trace_parent.trace_id, 'DataType': 'String'},
'span_id': {'StringValue': trace_parent.span_id, 'DataType': 'String'},
'trace_state': {'StringValue': trace_parent.trace_state, 'DataType': 'String'},
}
)
print(f"Sent message to SQS: {response['MessageId']}")
When this message arrives at order-processor, Elastic APM does its magic. It looks for specific message attributes (like traceparent, tracestate, trace_id, span_id) that contain distributed tracing headers. If found, it uses apm.import_trace_context() to link the incoming SQS message processing span to the trace that started at the producer.
The key here is that the producer must explicitly inject the trace context into the SQS MessageAttributes. SQS itself doesn’t propagate this. Then, the consumer, order-processor, must read these attributes and use apm.import_trace_context() to establish the link.
After processing the order, order-processor publishes to Kafka. Notice how it explicitly adds traceparent and tracestate to the Kafka message headers. This is crucial. Kafka, like SQS, doesn’t magically propagate trace context. You have to manually put it in the message payload or headers.
The kafka_producer.send call uses headers=[('traceparent', apm.get_trace_parent().serialize().encode('utf-8')), ...]. apm.get_trace_parent() retrieves the current trace context (which was established when the SQS message was processed), and serialize() converts it into the W3C Trace Context format. This header is then picked up by any downstream Kafka consumers instrumented with Elastic APM.
In the Elastic APM UI, you’d see a transaction for the SQS receive/process operation, followed by a span for Kafka publishing. If another service consumes from that Kafka topic and is also instrumented, its transaction would be linked back to the Kafka publish span, forming a complete end-to-end trace across services and messaging queues. The trace would show the original SQS send, the SQS receive and processing in order-processor, and the Kafka publish from order-processor.
The one thing that trips most people up is assuming that the messaging system itself will carry the trace context forward. It won’t. You are the one responsible for serializing the trace context (usually into the W3C traceparent and tracestate headers) and injecting it into the message’s metadata (SQS MessageAttributes, Kafka headers, etc.) on the sending side, and then deserializing and importing that context on the receiving side.
The next challenge is tracing messages that are batched by SQS or Kafka.