DynamoDB Streams don’t just log changes; they act as a real-time event bus that Lambda functions can subscribe to, enabling stateful processing of database operations without complex polling.
Let’s see this in action. Imagine we have a products table in DynamoDB. When a new product is added, or an existing one is updated, we want to send a notification.
Here’s a simplified products table schema:
{
"TableName": "products",
"KeySchema": [
{
"AttributeName": "productId",
"KeyType": "HASH"
}
],
"AttributeDefinitions": [
{
"AttributeName": "productId",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
Now, we enable DynamoDB Streams on this table. We’ll choose NEW_AND_OLD_IMAGES so our Lambda function gets both the state before and after the change.
aws dynamodb update-table \
--table-name products \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
This command enables the stream. You can verify it by describing the table:
aws dynamodb describe-table --table-name products --query 'Table.LatestStreamArn'
This will return the ARN of the stream, which is crucial for connecting our Lambda function.
Next, we create a Lambda function. This function will be triggered by events from the DynamoDB Stream.
Here’s a Python example for our Lambda function:
import json
import boto3
print('Loading function')
def lambda_handler(event, context):
# The event object contains records from the DynamoDB stream
for record in event['Records']:
print(f"Processing record: {record['eventID']}")
# Get the stream record type (INSERT, MODIFY, REMOVE)
event_name = record['eventName']
# Get the new image of the item
new_image = record.get('dynamodb', {}).get('NewImage')
if new_image:
product_id = new_image.get('productId', {}).get('S')
product_name = new_image.get('name', {}).get('S')
price = new_image.get('price', {}).get('N') # Note: 'N' is a Number type in DynamoDB JSON
print(f"Event: {event_name}, Product ID: {product_id}, Name: {product_name}, Price: {price}")
# Here you would add your logic, e.g., send a notification
if event_name == 'INSERT':
print(f"New product added: {product_name}")
elif event_name == 'MODIFY':
print(f"Product updated: {product_name}")
elif event_name == 'REMOVE':
print(f"Product removed: {product_id}")
else:
print("No NewImage found in record.")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed events!')
}
Now, we need to connect this Lambda function to the DynamoDB Stream. We do this by creating an event source mapping.
aws lambda create-event-source-mapping \
--function-name your-lambda-function-name \
--event-source-arn arn:aws:dynamodb:your-region:your-account-id:table/products/stream/your-stream-timestamp \
--batch-size 100 \
--starting-position LATEST
Replace your-lambda-function-name, your-region, your-account-id, and your-stream-timestamp (which you get from the describe-table output).
The batch-size controls how many records Lambda polls from the stream at once. starting-position LATEST means it will only process new records added after the mapping is created. You could also use TRIM_HORIZON to process all existing records.
Once this is set up, any INSERT, MODIFY, or REMOVE operation on the products table will send a batch of records to your Lambda function.
Let’s simulate an INSERT:
aws dynamodb put-item \
--table-name products \
--item '{
"productId": {"S": "prod-123"},
"name": {"S": "Wireless Mouse"},
"price": {"N": "25.99"},
"category": {"S": "Electronics"}
}'
Your Lambda function’s logs (accessible via CloudWatch Logs) will then show output similar to this:
START RequestId: abcdef12-3456-7890-abcd-ef1234567890 Version: $LATEST
Loading function
Processing record: 1234567890abcdef1234567890
Event: INSERT, Product ID: prod-123, Name: Wireless Mouse, Price: 25.99
New product added: Wireless Mouse
END RequestId: abcdef12-3456-7890-abcd-ef1234567890
REPORT RequestId: abcdef12-3456-7890-abcd-ef1234567890 Duration: 150.50 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 55 MB
The crucial part here is understanding the event object structure. It’s an array of Records, and each Record contains eventID, eventName, eventVersion, eventSource, awsRegion, and the dynamodb object. The dynamodb object holds Keys, NewImage, OldImage, SequenceNumber, and SizeBytes. NewImage and OldImage are JSON representations of the DynamoDB item, with data types like S for String, N for Number, BOOL for Boolean, etc.
When processing records, Lambda polls the stream. If your Lambda function fails to process a batch (e.g., an unhandled exception occurs), Lambda will retry processing that same batch until it succeeds or the data expires from the stream (90 days). This retry behavior is automatic and managed by the event source mapping.
The most surprising thing about DynamoDB Streams is that they are not just a historical log; they are an active, ordered sequence of events that can be consumed by multiple consumers concurrently, each maintaining its own position in the stream. This allows for decoupled microservices where one service can react to data changes in another without direct communication or tight coupling.
The batch-size and maximum-batching-window-in-seconds (which you can set via aws lambda update-event-source-mapping or during creation) are key tuning parameters for controlling how often your Lambda function is invoked and how much data it processes per invocation. A smaller batch size might lead to more frequent invocations but with less data per invocation, potentially increasing latency for individual records. A larger batch size can reduce the number of Lambda invocations but might increase the time it takes for a single record to be processed if the batch is large and processing is slow.
The next concept you’ll likely encounter is handling errors and retries more gracefully, including dead-letter queues for persistent failures.