EventBridge events can be delivered to an SQS FIFO queue, but there’s a catch: EventBridge doesn’t natively support sending MessageGroupId and MessageDeduplicationId which are mandatory for SQS FIFO queues. This means you can’t guarantee ordered processing or deduplication out of the box if you just set up a basic EventBridge rule to SQS target.
Let’s see how this actually looks in practice.
Imagine you have an application that emits OrderPlaced events. You want to process these events in the exact order they occurred to avoid race conditions in your downstream inventory management system. You’ve set up an SQS FIFO queue named order-processing-fifo-queue and an EventBridge rule that targets it.
// Example OrderPlaced Event
{
"source": "com.mycompany.orders",
"detail-type": "OrderPlaced",
"detail": {
"orderId": "ORD12345",
"customerId": "CUST987",
"timestamp": "2023-10-27T10:00:00Z",
"items": [...]
}
}
If you simply configure an EventBridge rule with this SQS queue as a target, EventBridge will attempt to send the event. However, SQS FIFO requires MessageGroupId and MessageDeduplicationId. EventBridge, by default, won’t populate these.
Here’s the crucial part: EventBridge does allow you to customize the SQS message. You can map event fields to SQS message attributes, including MessageGroupId and MessageDeduplicationId.
The solution involves using EventBridge Pipes or a Lambda function as an intermediary.
Option 1: EventBridge Pipes (Recommended)
EventBridge Pipes are designed for this exact scenario. They connect an event source (like EventBridge) to a target (like SQS FIFO) and can include a Transform step.
-
Create the SQS FIFO Queue:
aws sqs create-queue --queue-name order-processing-fifo-queue --attributes '{ "FifoQueue": "true", "ContentBasedDeduplication": "true" }'FifoQueue: "true": This is the obvious one, making it a FIFO queue.ContentBasedDeduplication: "true": This tells SQS to automatically generate a deduplication ID based on the message content. This is useful if your event structure is stable and you want SQS to handle deduplication without explicitMessageDeduplicationIdfrom the source. However, for guaranteed ordering based on event logic, you’ll still needMessageGroupId.
-
Create an EventBridge Pipe: You’ll need a source (your EventBridge event bus, likely
default), a target (your SQS FIFO queue), and aTransformstep. The transform step is where you’ll map event fields toMessageGroupIdandMessageDeduplicationId.Let’s assume your
OrderPlacedevent has anorderIdfield that you want to use as theMessageGroupIdfor ordering and a unique identifier (like aneventIdif you had one, or you can generate one or useorderId+ timestamp for deduplication) forMessageDeduplicationId.You’d typically define this using the AWS CLI or SDK. For example, creating a pipe with a Lambda transform:
-
Lambda Function (
event-transformer.py):import json import uuid def lambda_handler(event, context): print(f"Received event: {json.dumps(event)}") # Assuming event['detail'] contains your order information order_detail = event['detail'] order_id = order_detail['orderId'] # For deduplication, you might use the orderId and a timestamp # or a unique event ID if available in the original event. # If ContentBasedDeduplication is true on the queue, # you might only strictly need MessageGroupId here. deduplication_id = f"{order_id}-{event['time']}" # Example: use timestamp from EventBridge metadata # EventBridge Pipes expects specific output format for SQS FIFO sqs_message = { "messageGroupId": order_id, # Use orderId for MessageGroupId "messageDeduplicationId": deduplication_id, # Use a combination for deduplication "messageBody": json.dumps(event) # The original event as the message body } print(f"Transformed for SQS: {json.dumps(sqs_message)}") return sqs_message -
IAM Role for Pipe: Ensure the role has permissions for
events:Retrieve*,pipes:StartPipe,pipes:StopPipe,sqs:SendMessage, andlambda:InvokeFunction. -
Create the Pipe (Conceptual CLI - actual command is more complex):
# This is a simplified representation. The actual command involves # specifying source, target, and enrichment/transform details. aws pipes create-pipe \ --name order-processing-pipe \ --source-arn arn:aws:events:us-east-1:123456789012:event-bus/default \ --target-arn arn:aws:sqs:us-east-1:123456789012:order-processing-fifo-queue \ --enrichment "arn:aws:lambda:us-east-1:123456789012:function:event-transformer" \ --enrichment-parameters '{"InputTemplate": "$।"}' \ # Pass entire event to Lambda --role-arn arn:aws:iam::123456789012:role/EventBridgePipeRole \ --target-parameters '{ "SqsQueueParameters": { "MessageGroupId": "$.messageGroupId", "MessageDeduplicationId": "$.messageDeduplicationId" } }'$.messageGroupIdand$.messageDeduplicationId: These are JSONPath expressions that tell EventBridge Pipes to extract themessageGroupIdandmessageDeduplicationIdvalues from the output of the enrichment (Lambda) step.- The Lambda function is configured to return a dictionary containing these keys, which are then used by EventBridge Pipes to construct the SQS
SendMessageAPI call.
-
Option 2: Lambda Function as EventBridge Target
If you can’t use Pipes, a Lambda function can act as the intermediary.
-
Create the SQS FIFO Queue (same as above).
-
Create a Lambda Function:
import json import boto3 import uuid sqs = boto3.client('sqs') # Get queue URL dynamically or hardcode if preferred QUEUE_URL = 'YOUR_SQS_FIFO_QUEUE_URL' # e.g., 'https://sqs.us-east-1.amazonaws.com/123456789012/order-processing-fifo-queue' def lambda_handler(event, context): print(f"Received event: {json.dumps(event)}") # EventBridge events arrive wrapped in an outer structure # The actual event detail is often in event['detail'] # Assuming the core event is directly in event or event['detail'] event_body = event # Or event['detail'] if nested # Extract fields for FIFO parameters # This depends heavily on your event structure. # For example, if the order ID is in event['detail']['orderId'] order_id = event_body.get('detail', {}).get('orderId') or event_body.get('orderId') if not order_id: print("Error: Could not find orderId in event for MessageGroupId.") # Handle error: dead-letter queue, log, etc. return # Generate a deduplication ID. Using the orderId and event timestamp is common. # Alternatively, use a UUID or a hash of the entire message body. deduplication_id = f"{order_id}-{event['time']}" # Using EventBridge's event timestamp try: response = sqs.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps(event_body), # Send the original event as body MessageGroupId=order_id, # Crucial for ordering MessageDeduplicationId=deduplication_id # Crucial for deduplication ) print(f"Sent message to SQS FIFO: {response['MessageId']}") return { 'statusCode': 200, 'body': json.dumps('Message sent to SQS FIFO successfully!') } except Exception as e: print(f"Error sending message to SQS FIFO: {e}") # Depending on your error handling strategy, you might want to: # - Raise the exception to trigger Lambda retries (if configured) # - Send to a Dead Letter Queue # - Log the error and return a success status if partial failure is acceptable raise e -
Configure EventBridge Rule Target: Set the EventBridge rule’s target to this Lambda function. EventBridge will invoke the Lambda with the event data. The Lambda function then uses the AWS SDK to send the message to the SQS FIFO queue with the necessary
MessageGroupIdandMessageDeduplicationIdpopulated from the event data.You’ll need to grant the Lambda function
sqs:SendMessagepermissions to your FIFO queue.
The key insight here is that EventBridge itself doesn’t magically know how to populate MessageGroupId and MessageDeduplicationId. You must explicitly tell it how to derive these values from the event payload, either by using EventBridge Pipes’ transformation capabilities or by writing custom logic in an intermediary Lambda function. Without this step, any attempt to target an SQS FIFO queue directly from EventBridge will fail or result in messages being treated as standard messages if the queue somehow allowed it (which it generally won’t for FIFO specific attributes).
The next thing you’ll run into is managing the MessageGroupId strategy to ensure your processing logic aligns with your ordering requirements.