CloudFormation stacks can drift from their intended state, and it’s a common pain point that tags applied to the stack itself don’t automatically cascade down to the individual resources within that stack.
Here’s how to get tags from your stack onto its resources.
Let’s say you have a CloudFormation stack named My-Super-Stack with a tag Environment: Production. You want all resources created by this stack, like EC2 instances or S3 buckets, to also have the Environment: Production tag.
We’ll use a Lambda-backed Custom Resource to achieve this. The Custom Resource will be triggered by CloudFormation events (create, update, delete) and will perform the tagging actions.
First, here’s the IAM role that the Lambda function will assume. It needs permissions to read CloudFormation stack details and to tag AWS resources.
Resources:
TagPropagationRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: TaggingPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- cloudformation:DescribeStacks
- ec2:CreateTags
- s3:PutBucketTagging
# Add any other resource-specific tagging actions here
Resource: "*"
Next, the Lambda function itself. This Python code will:
- Get the stack name from the event.
- Use
cloudformation:DescribeStacksto get the tags from the stack. - Iterate through the resources in the stack.
- For each resource, call the appropriate tagging API (
ec2:CreateTags,s3:PutBucketTagging, etc.) to apply the stack’s tags.
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
cf_client = boto3.client('cloudformation')
ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
request_type = event['RequestType']
stack_name = event['ResourceProperties']['StackName']
resource_physical_id = event.get('PhysicalResourceId')
logger.info(f"Request type: {request_type}, Stack Name: {stack_name}")
if request_type in ['Create', 'Update']:
try:
stack_tags = get_stack_tags(stack_name)
if not stack_tags:
logger.warning(f"No tags found for stack: {stack_name}")
return send_response(event, context, "SUCCESS", {"Message": "No tags to propagate"})
resource_tagging_success = tag_resources_in_stack(stack_name, stack_tags)
if resource_tagging_success:
response_data = {"Message": f"Successfully propagated tags to resources in stack {stack_name}"}
send_response(event, context, "SUCCESS", response_data)
else:
response_data = {"Message": f"Failed to propagate tags for some resources in stack {stack_name}"}
send_response(event, context, "FAILED", response_data)
except Exception as e:
logger.error(f"Error processing stack {stack_name}: {e}")
send_response(event, context, "FAILED", {"Message": str(e)})
elif request_type == 'Delete':
# Optional: Implement logic to remove tags on delete if desired
# For this example, we'll just acknowledge the delete event
logger.info(f"Delete event for stack {stack_name}. No tag removal implemented.")
send_response(event, context, "SUCCESS", {"Message": "Delete acknowledged"})
def get_stack_tags(stack_name):
try:
response = cf_client.describe_stacks(StackName=stack_name)
if response['Stacks']:
return response['Stacks'][0].get('Tags', [])
return []
except Exception as e:
logger.error(f"Error describing stack {stack_name}: {e}")
raise
def tag_resources_in_stack(stack_name, tags):
all_resources_tagged = True
try:
response = cf_client.list_stack_resources(StackName=stack_name)
for resource in response['StackResourceSummaries']:
physical_resource_id = resource['PhysicalResourceId']
resource_type = resource['ResourceType']
logger.info(f"Tagging resource: {physical_resource_id} (Type: {resource_type})")
try:
if resource_type.startswith('AWS::EC2::Instance'):
ec2_client.create_tags(
Resources=[physical_resource_id],
Tags=tags
)
elif resource_type.startswith('AWS::S3::Bucket'):
s3_client.put_bucket_tagging(
Bucket=physical_resource_id,
Tagging={'TagSet': tags}
)
# Add more resource type handlers as needed
# elif resource_type.startswith('AWS::RDS::DBInstance'):
# rds_client.add_tags_to_resource(
# ResourceName=physical_resource_id,
# Tags=tags
# )
else:
logger.warning(f"Skipping tagging for unsupported resource type: {resource_type}")
except Exception as e:
logger.error(f"Failed to tag resource {physical_resource_id} (Type: {resource_type}): {e}")
all_resources_tagged = False
return all_resources_tagged
except Exception as e:
logger.error(f"Error listing stack resources for {stack_name}: {e}")
return False
def send_response(event, context, response_status, response_data):
response_body = json.dumps({
"Status": response_status,
"Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name,
"PhysicalResourceId": event.get('PhysicalResourceId', context.log_stream_name),
"StackId": event['StackId'],
"RequestId": event['RequestId'],
"LogicalResourceId": event['LogicalResourceId'],
"Data": response_data
})
import urllib3
http = urllib3.PoolManager()
logger.info(f"Sending response to presigned URL: {event['ResponseURL']}")
http.request('PUT', event['ResponseURL'], body=response_body.encode('utf-8'))
Finally, the CloudFormation resource that ties it all together. This is a CustomResource that points to our Lambda function and passes the StackName property. We also add a DependsOn to ensure the IAM role is created before the Lambda function.
Resources:
TagPropagationLambda:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt TagPropagationRole.Arn
Runtime: python3.9
Code:
ZipFile: |
# Paste the Lambda function code here
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
cf_client = boto3.client('cloudformation')
ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
request_type = event['RequestType']
stack_name = event['ResourceProperties']['StackName']
resource_physical_id = event.get('PhysicalResourceId')
logger.info(f"Request type: {request_type}, Stack Name: {stack_name}")
if request_type in ['Create', 'Update']:
try:
stack_tags = get_stack_tags(stack_name)
if not stack_tags:
logger.warning(f"No tags found for stack: {stack_name}")
return send_response(event, context, "SUCCESS", {"Message": "No tags to propagate"})
resource_tagging_success = tag_resources_in_stack(stack_name, stack_tags)
if resource_tagging_success:
response_data = {"Message": f"Successfully propagated tags to resources in stack {stack_name}"}
send_response(event, context, "SUCCESS", response_data)
else:
response_data = {"Message": f"Failed to propagate tags for some resources in stack {stack_name}"}
send_response(event, context, "FAILED", response_data)
except Exception as e:
logger.error(f"Error processing stack {stack_name}: {e}")
send_response(event, context, "FAILED", {"Message": str(e)})
elif request_type == 'Delete':
logger.info(f"Delete event for stack {stack_name}. No tag removal implemented.")
send_response(event, context, "SUCCESS", {"Message": "Delete acknowledged"})
def get_stack_tags(stack_name):
try:
response = cf_client.describe_stacks(StackName=stack_name)
if response['Stacks']:
return response['Stacks'][0].get('Tags', [])
return []
except Exception as e:
logger.error(f"Error describing stack {stack_name}: {e}")
raise
def tag_resources_in_stack(stack_name, tags):
all_resources_tagged = True
try:
response = cf_client.list_stack_resources(StackName=stack_name)
for resource in response['StackResourceSummaries']:
physical_resource_id = resource['PhysicalResourceId']
resource_type = resource['ResourceType']
logger.info(f"Tagging resource: {physical_resource_id} (Type: {resource_type})")
try:
if resource_type.startswith('AWS::EC2::Instance'):
ec2_client.create_tags(
Resources=[physical_resource_id],
Tags=tags
)
elif resource_type.startswith('AWS::S3::Bucket'):
s3_client.put_bucket_tagging(
Bucket=physical_resource_id,
Tagging={'TagSet': tags}
)
else:
logger.warning(f"Skipping tagging for unsupported resource type: {resource_type}")
except Exception as e:
logger.error(f"Failed to tag resource {physical_resource_id} (Type: {resource_type}): {e}")
all_resources_tagged = False
return all_resources_tagged
except Exception as e:
logger.error(f"Error listing stack resources for {stack_name}: {e}")
return False
def send_response(event, context, response_status, response_data):
response_body = json.dumps({
"Status": response_status,
"Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name,
"PhysicalResourceId": event.get('PhysicalResourceId', context.log_stream_name),
"StackId": event['StackId'],
"RequestId": event['RequestId'],
"LogicalResourceId": event['LogicalResourceId'],
"Data": response_data
})
import urllib3
http = urllib3.PoolManager()
logger.info(f"Sending response to presigned URL: {event['ResponseURL']}")
http.request('PUT', event['ResponseURL'], body=response_body.encode('utf-8'))
TagPropagationCustomResource:
Type: AWS::CloudFormation::CustomResource
Properties:
ServiceToken: !GetAtt TagPropagationLambda.Arn
StackName: !Ref AWS::StackName # This refers to the current stack being created/updated
DependsOn: TagPropagationLambda # Ensure Lambda is created first
When you deploy this, the TagPropagationCustomResource will trigger the Lambda. The Lambda function will then fetch the tags from the stack that is currently being created or updated (because !Ref AWS::StackName refers to the stack containing this Custom Resource) and apply them to all resources listed by list_stack_resources. If you update the tags on the stack, the Update event for the Custom Resource will fire, re-running the Lambda to update the resource tags.
The next thing you’ll likely want to tackle is how to remove tags when resources are deleted from the stack, which requires adding specific logic to the Delete event handler in the Lambda.