CloudFormation stacks can drift from their intended state, and it’s a common pain point that tags applied to the stack itself don’t automatically cascade down to the individual resources within that stack.

Here’s how to get tags from your stack onto its resources.

Let’s say you have a CloudFormation stack named My-Super-Stack with a tag Environment: Production. You want all resources created by this stack, like EC2 instances or S3 buckets, to also have the Environment: Production tag.

We’ll use a Lambda-backed Custom Resource to achieve this. The Custom Resource will be triggered by CloudFormation events (create, update, delete) and will perform the tagging actions.

First, here’s the IAM role that the Lambda function will assume. It needs permissions to read CloudFormation stack details and to tag AWS resources.

Resources:
  TagPropagationRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: TaggingPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - cloudformation:DescribeStacks
                  - ec2:CreateTags
                  - s3:PutBucketTagging
                  # Add any other resource-specific tagging actions here
                Resource: "*"

Next, the Lambda function itself. This Python code will:

  1. Get the stack name from the event.
  2. Use cloudformation:DescribeStacks to get the tags from the stack.
  3. Iterate through the resources in the stack.
  4. For each resource, call the appropriate tagging API (ec2:CreateTags, s3:PutBucketTagging, etc.) to apply the stack’s tags.
import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

cf_client = boto3.client('cloudformation')
ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')

def lambda_handler(event, context):
    request_type = event['RequestType']
    stack_name = event['ResourceProperties']['StackName']
    resource_physical_id = event.get('PhysicalResourceId')

    logger.info(f"Request type: {request_type}, Stack Name: {stack_name}")

    if request_type in ['Create', 'Update']:
        try:
            stack_tags = get_stack_tags(stack_name)
            if not stack_tags:
                logger.warning(f"No tags found for stack: {stack_name}")
                return send_response(event, context, "SUCCESS", {"Message": "No tags to propagate"})

            resource_tagging_success = tag_resources_in_stack(stack_name, stack_tags)

            if resource_tagging_success:
                response_data = {"Message": f"Successfully propagated tags to resources in stack {stack_name}"}
                send_response(event, context, "SUCCESS", response_data)
            else:
                response_data = {"Message": f"Failed to propagate tags for some resources in stack {stack_name}"}
                send_response(event, context, "FAILED", response_data)

        except Exception as e:
            logger.error(f"Error processing stack {stack_name}: {e}")
            send_response(event, context, "FAILED", {"Message": str(e)})

    elif request_type == 'Delete':
        # Optional: Implement logic to remove tags on delete if desired
        # For this example, we'll just acknowledge the delete event
        logger.info(f"Delete event for stack {stack_name}. No tag removal implemented.")
        send_response(event, context, "SUCCESS", {"Message": "Delete acknowledged"})

def get_stack_tags(stack_name):
    try:
        response = cf_client.describe_stacks(StackName=stack_name)
        if response['Stacks']:
            return response['Stacks'][0].get('Tags', [])
        return []
    except Exception as e:
        logger.error(f"Error describing stack {stack_name}: {e}")
        raise

def tag_resources_in_stack(stack_name, tags):
    all_resources_tagged = True
    try:
        response = cf_client.list_stack_resources(StackName=stack_name)
        for resource in response['StackResourceSummaries']:
            physical_resource_id = resource['PhysicalResourceId']
            resource_type = resource['ResourceType']
            logger.info(f"Tagging resource: {physical_resource_id} (Type: {resource_type})")

            try:
                if resource_type.startswith('AWS::EC2::Instance'):
                    ec2_client.create_tags(
                        Resources=[physical_resource_id],
                        Tags=tags
                    )
                elif resource_type.startswith('AWS::S3::Bucket'):
                    s3_client.put_bucket_tagging(
                        Bucket=physical_resource_id,
                        Tagging={'TagSet': tags}
                    )
                # Add more resource type handlers as needed
                # elif resource_type.startswith('AWS::RDS::DBInstance'):
                #     rds_client.add_tags_to_resource(
                #         ResourceName=physical_resource_id,
                #         Tags=tags
                #     )
                else:
                    logger.warning(f"Skipping tagging for unsupported resource type: {resource_type}")

            except Exception as e:
                logger.error(f"Failed to tag resource {physical_resource_id} (Type: {resource_type}): {e}")
                all_resources_tagged = False
        return all_resources_tagged
    except Exception as e:
        logger.error(f"Error listing stack resources for {stack_name}: {e}")
        return False

def send_response(event, context, response_status, response_data):
    response_body = json.dumps({
        "Status": response_status,
        "Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name,
        "PhysicalResourceId": event.get('PhysicalResourceId', context.log_stream_name),
        "StackId": event['StackId'],
        "RequestId": event['RequestId'],
        "LogicalResourceId": event['LogicalResourceId'],
        "Data": response_data
    })

    import urllib3
    http = urllib3.PoolManager()
    logger.info(f"Sending response to presigned URL: {event['ResponseURL']}")
    http.request('PUT', event['ResponseURL'], body=response_body.encode('utf-8'))

Finally, the CloudFormation resource that ties it all together. This is a CustomResource that points to our Lambda function and passes the StackName property. We also add a DependsOn to ensure the IAM role is created before the Lambda function.

Resources:
  TagPropagationLambda:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt TagPropagationRole.Arn
      Runtime: python3.9
      Code:
        ZipFile: |
          # Paste the Lambda function code here
          import json
          import boto3
          import logging

          logger = logging.getLogger()
          logger.setLevel(logging.INFO)

          cf_client = boto3.client('cloudformation')
          ec2_client = boto3.client('ec2')
          s3_client = boto3.client('s3')

          def lambda_handler(event, context):
              request_type = event['RequestType']
              stack_name = event['ResourceProperties']['StackName']
              resource_physical_id = event.get('PhysicalResourceId')

              logger.info(f"Request type: {request_type}, Stack Name: {stack_name}")

              if request_type in ['Create', 'Update']:
                  try:
                      stack_tags = get_stack_tags(stack_name)
                      if not stack_tags:
                          logger.warning(f"No tags found for stack: {stack_name}")
                          return send_response(event, context, "SUCCESS", {"Message": "No tags to propagate"})

                      resource_tagging_success = tag_resources_in_stack(stack_name, stack_tags)

                      if resource_tagging_success:
                          response_data = {"Message": f"Successfully propagated tags to resources in stack {stack_name}"}
                          send_response(event, context, "SUCCESS", response_data)
                      else:
                          response_data = {"Message": f"Failed to propagate tags for some resources in stack {stack_name}"}
                          send_response(event, context, "FAILED", response_data)

                  except Exception as e:
                      logger.error(f"Error processing stack {stack_name}: {e}")
                      send_response(event, context, "FAILED", {"Message": str(e)})

              elif request_type == 'Delete':
                  logger.info(f"Delete event for stack {stack_name}. No tag removal implemented.")
                  send_response(event, context, "SUCCESS", {"Message": "Delete acknowledged"})

          def get_stack_tags(stack_name):
              try:
                  response = cf_client.describe_stacks(StackName=stack_name)
                  if response['Stacks']:
                      return response['Stacks'][0].get('Tags', [])
                  return []
              except Exception as e:
                  logger.error(f"Error describing stack {stack_name}: {e}")
                  raise

          def tag_resources_in_stack(stack_name, tags):
              all_resources_tagged = True
              try:
                  response = cf_client.list_stack_resources(StackName=stack_name)
                  for resource in response['StackResourceSummaries']:
                      physical_resource_id = resource['PhysicalResourceId']
                      resource_type = resource['ResourceType']
                      logger.info(f"Tagging resource: {physical_resource_id} (Type: {resource_type})")

                      try:
                          if resource_type.startswith('AWS::EC2::Instance'):
                              ec2_client.create_tags(
                                  Resources=[physical_resource_id],
                                  Tags=tags
                              )
                          elif resource_type.startswith('AWS::S3::Bucket'):
                              s3_client.put_bucket_tagging(
                                  Bucket=physical_resource_id,
                                  Tagging={'TagSet': tags}
                              )
                          else:
                              logger.warning(f"Skipping tagging for unsupported resource type: {resource_type}")

                      except Exception as e:
                          logger.error(f"Failed to tag resource {physical_resource_id} (Type: {resource_type}): {e}")
                          all_resources_tagged = False
                  return all_resources_tagged
              except Exception as e:
                  logger.error(f"Error listing stack resources for {stack_name}: {e}")
                  return False

          def send_response(event, context, response_status, response_data):
              response_body = json.dumps({
                  "Status": response_status,
                  "Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name,
                  "PhysicalResourceId": event.get('PhysicalResourceId', context.log_stream_name),
                  "StackId": event['StackId'],
                  "RequestId": event['RequestId'],
                  "LogicalResourceId": event['LogicalResourceId'],
                  "Data": response_data
              })

              import urllib3
              http = urllib3.PoolManager()
              logger.info(f"Sending response to presigned URL: {event['ResponseURL']}")
              http.request('PUT', event['ResponseURL'], body=response_body.encode('utf-8'))

  TagPropagationCustomResource:
    Type: AWS::CloudFormation::CustomResource
    Properties:
      ServiceToken: !GetAtt TagPropagationLambda.Arn
      StackName: !Ref AWS::StackName # This refers to the current stack being created/updated
    DependsOn: TagPropagationLambda # Ensure Lambda is created first

When you deploy this, the TagPropagationCustomResource will trigger the Lambda. The Lambda function will then fetch the tags from the stack that is currently being created or updated (because !Ref AWS::StackName refers to the stack containing this Custom Resource) and apply them to all resources listed by list_stack_resources. If you update the tags on the stack, the Update event for the Custom Resource will fire, re-running the Lambda to update the resource tags.

The next thing you’ll likely want to tackle is how to remove tags when resources are deleted from the stack, which requires adding specific logic to the Delete event handler in the Lambda.

Want structured learning?

Take the full Cloudformation course →