EventBridge Pipes let you connect AWS services in a way that feels like building with LEGOs, but the most surprising part is how they can actually transform data between services, not just pass it along.

Imagine you have a stream of incoming customer order events in Kinesis. You want to enrich these orders with customer details from DynamoDB, filter out any orders that are too small to be profitable, and then send the remaining enriched orders to a Step Functions workflow for fulfillment. This is exactly what EventBridge Pipes excel at.

Here’s how it looks in action. We’ll set up a pipe that takes events from a Kinesis data stream, pipes them through a Lambda function for enrichment and filtering, and then sends the result to a Step Functions state machine.

First, we need a Kinesis data stream. Let’s assume we have one named my-order-stream.

Next, we need a Lambda function that will enrich and filter the events. This function will receive an event, look up customer details in DynamoDB, and decide whether to keep the order.

import json
import boto3

dynamodb = boto3.resource('dynamodb')
customer_table = dynamodb.Table('customer-details') # Assume this table exists

def enrich_and_filter(event):
    order_id = event['detail']['orderId']
    customer_id = event['detail']['customerId']
    order_total = event['detail']['total']

    # Enrich with customer data
    try:
        customer_response = customer_table.get_item(Key={'customerId': customer_id})
        customer_info = customer_response.get('Item', {})
    except Exception as e:
        print(f"Error fetching customer {customer_id}: {e}")
        customer_info = {}

    # Filter out low-value orders
    if order_total < 50.00:
        print(f"Filtering out order {order_id} due to low total: {order_total}")
        return None # Returning None signals to the pipe to drop this event

    # Combine and return enriched event
    enriched_event = {
        "orderId": order_id,
        "customerId": customer_id,
        "orderTotal": order_total,
        "customerName": customer_info.get('name', 'N/A'),
        "customerEmail": customer_info.get('email', 'N/A')
    }
    return enriched_event

# The event structure from Kinesis will be wrapped by EventBridge.
# The actual data we care about is in event['detail'].
# The Lambda function needs to be configured to accept a batch of events.
# For simplicity here, we assume a single event for demonstration.
# In a real scenario, you'd iterate through event['Records'].

Now, let’s set up the Step Functions state machine that will receive the processed orders. Assume we have a state machine ARN: arn:aws:states:us-east-1:123456789012:stateMachine:OrderFulfillmentWorkflow.

With these components ready, we can create the EventBridge Pipe. We’ll use the AWS CLI for this:

aws pipes create-pipe \
    --name "order-processing-pipe" \
    --source "arn:aws:kinesis:us-east-1:123456789012:stream/my-order-stream" \
    --source-parameters '{"KinesisStreamParameters":{"StartingPosition":"LATEST"}}' \
    --enrichment "arn:aws:lambda:us-east-1:123456789012:function:enrich-and-filter-function" \
    --enrichment-parameters '{"InputTemplate":"<rawEvent>"}' \
    --target "arn:aws:states:us-east-1:123456789012:stateMachine:OrderFulfillmentWorkflow" \
    --target-parameters '{"StepFunctionStateMachineParameters":{"InputTemplate":"<enrichment>","RoleArn":"arn:aws:iam::123456789012:role/service-role/AmazonEventBridgePipesRole"}}' \
    --role-arn "arn:aws:iam::123456789012:role/service-role/AmazonEventBridgePipesRole"

Let’s break down what’s happening here:

  • --source: This is our Kinesis data stream where orders originate.
  • --source-parameters: We tell the pipe to start consuming new events (LATEST).
  • --enrichment: This is our Lambda function that does the heavy lifting of adding customer data and filtering.
  • --enrichment-parameters: InputTemplate:"<rawEvent>" tells the pipe to pass the entire raw event from Kinesis to the Lambda function. The Lambda function then returns the processed data.
  • --target: Our Step Functions state machine that orchestrates the fulfillment process.
  • --target-parameters: InputTemplate:"<enrichment>" instructs the pipe to send the output of the enrichment step (our Lambda function) to the Step Functions state machine. We also specify the IAM role Step Functions will assume.
  • --role-arn: This is the IAM role that EventBridge Pipes itself will assume to access the source, enrichment, and target services.

The InputTemplate is a powerful feature. It uses a JSONPath-like syntax to select and format data passed between stages. <rawEvent> means "take the entire incoming event payload." <enrichment> means "take the entire output of the enrichment step." You can be much more granular, like {"order_id": "$.detail.orderId", "customer_name": "$.enrichment.customerName"} if you wanted to reshape the data specifically for the target.

What most people miss is how the enrichment step can act as a gatekeeper. By returning None or an empty object from the enrichment Lambda (as shown in the filtering logic), the pipe will simply drop that event and not send it to the target. This allows for sophisticated, code-driven filtering at the pipe level, preventing unnecessary invocations of downstream services.

Once the pipe is created, EventBridge automatically manages the event flow, retries, and error handling between Kinesis, Lambda, and Step Functions.

The next thing you’ll want to explore is setting up dead-letter queues for pipe failures.

Want structured learning?

Take the full Eventbridge course →