EventBridge Pipes lets you connect AWS services without writing much code, but the real magic is how it lets you modify events between those services.

Imagine you have a stream of raw order events coming from an e-commerce site. These events are great for logging, but they’re missing crucial details your fraud detection system needs, like the customer’s full address and their recent purchase history. You could build a whole Lambda function to fetch this data and add it, but that’s overkill and slow. EventBridge Pipes lets you do this enrichment inline.

Here’s a simplified flow:

  1. Source: An SQS queue holding raw order_created events.
  2. Pipe: An EventBridge Pipe connecting the SQS queue to a target.
  3. Enrichment: A DynamoDB table storing customer profiles. The Pipe calls a Lambda function that looks up the customer ID from the order_created event in DynamoDB and returns the full customer object.
  4. Target: A Kinesis Data Stream for your fraud detection service.

When an order_created event lands in SQS, the Pipe triggers. It takes the event, sends the customer_id to a small Lambda function. This Lambda function queries DynamoDB for that customer_id and returns a JSON object containing the customer’s full address, purchase history, and other details. The Pipe then merges this enrichment data into the original order_created event, creating a richer event that’s sent to the Kinesis stream.

Let’s see this in action with a bit of conceptual configuration.

Source: SQS Queue my-order-queue

Enrichment:

  • Type: Lambda Function enrichOrderCustomerLambda
  • Input Transformation: {"customer_id": $.detail.customer_id} (This tells the Pipe to extract the customer_id from the incoming event’s detail field and pass it as the customer_id argument to the Lambda function.)
  • Lambda Function Code (Conceptual):
import json
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('customer-profiles')

def lambda_handler(event, context):
    customer_id = event['customer_id']
    response = table.get_item(Key={'customer_id': customer_id})
    customer_data = response.get('Item', {})
    return customer_data

Target: Kinesis Data Stream fraud-detection-stream

Pipe Configuration (AWS CLI Snippet):

aws pipes create-pipe \
    --name enrich-order-pipe \
    --source arn:aws:sqs:us-east-1:123456789012:my-order-queue \
    --source-parameters '{
        "SqsQueueParameters": {
            "BatchSize": 10
        }
    }' \
    --enrichment arn:aws:lambda:us-east-1:123456789012:function:enrichOrderCustomerLambda \
    --enrichment-parameters '{
        "InputTemplate": "{\"customer_id\": \"$.detail.customer_id\"}"
    }' \
    --target arn:aws:kinesis:us-east-1:123456789012:stream/fraud-detection-stream \
    --target-parameters '{
        "KinesisStreamParameters": {
            "PartitionKey": "$.detail.order_id"
        }
    }' \
    --role-arn arn:aws:iam::123456789012:role/service-role/AmazonEventBridgePipesRole-us-east-1-enrich-order-pipe

In this setup, the Pipe intercepts events from SQS. It grabs the customer_id from the event’s detail payload, uses it to invoke enrichOrderCustomerLambda, and then takes the result of that Lambda function (the customer data) and merges it into the original event before sending the combined, enriched event to Kinesis. The InputTemplate is critical here; it shapes the data sent to the enrichment step, ensuring your Lambda only receives what it needs.

The real power of Pipes is that you don’t need to write the glue code yourself. EventBridge handles polling the source, invoking the enrichment, and sending to the target. You define the what, and EventBridge figures out the how. This drastically reduces the operational burden of building event-driven architectures that require data transformation or augmentation.

A common misconception is that enrichment always means adding data. You can also use enrichment to filter or transform data in more complex ways than simple JSONPath. For example, your enrichment Lambda could check if a customer has a "premium" status and return a simple boolean {"is_premium": true}. The Pipe would then merge this into the event. If the Lambda returned {"is_premium": false}, the Pipe could potentially be configured with a step to not send the event to the target if is_premium is false, effectively using enrichment as a filter.

Once you’ve got your enrichment working, the next logical step is often to use the enriched data to trigger different downstream actions based on its content, which you can achieve using EventBridge Rules on the target stream.

Want structured learning?

Take the full Eventbridge course →