Crossplane Pipeline Functions let you manipulate Kubernetes Custom Resources (CRs) as they flow through Crossplane’s control loops, enabling powerful customization and automation.

Imagine you’ve just declared a new AWS RDS instance using Crossplane:

apiVersion: rds.aws.upbound.io/v1beta1
kind: Instance
metadata:
  name: my-database
spec:
  forProvider:
    region: us-east-1
    dbInstanceClass: db.t3.micro
    engine: postgres
    engineVersion: "13.7"

By default, Crossplane will try to create this RDS instance. But what if you want to enforce a specific tagging strategy across all your databases, or inject a default backup retention period that users might forget? This is where Pipeline Functions come in.

Pipeline Functions are essentially small programs that intercept and modify Crossplane resources before they are applied to the cloud provider or before they are reconciled. They operate within the Crossplane control plane, allowing you to build complex, declarative workflows for resource management.

Here’s how it looks in action. Let’s say we want to automatically add a managed-by tag and set a backupRetentionPeriod to 7 days for all RDS instances.

First, we define a Function Custom Resource in Crossplane that points to our function image. This image contains our custom logic.

apiVersion: pkg.crossplane.io/v1
kind: Function
metadata:
  name: rds-enhancer
spec:
  package: xuanquanz/rds-enhancer:v0.1.0 # Our custom function image

Next, we define a FunctionPipeline that specifies the order of functions to execute. In this case, we have one function.

apiVersion: pipelines.crossplane.io/v1
kind: FunctionPipeline
metadata:
  name: default-rds-pipeline
spec:
  pipeline:
    - functionRef:
        name: rds-enhancer

Now, we associate this pipeline with our Instance resource. This is done by adding an annotation to the Instance’s metadata:

apiVersion: rds.aws.upbound.io/v1beta1
kind: Instance
metadata:
  name: my-database
  annotations:
    crossplane.io/function-pipeline: default-rds-pipeline # Reference our pipeline
spec:
  forProvider:
    region: us-east-1
    dbInstanceClass: db.t3.micro
    engine: postgres
    engineVersion: "13.7"

When Crossplane reconciles this Instance, it will first send the Instance resource to the rds-enhancer function. Our function will then modify the resource.

The core concept is that a function receives a Composite object (which represents the desired state from the user) and a Managed object (which represents the current state of the resource on the cloud provider). It can then transform the Composite into one or more Managed objects that will be applied.

Our rds-enhancer function, written in Go (or any language that can produce a gRPC server), would look something like this conceptually:

// Inside the function's main logic
func (s *server) RunFunction(ctx context.Context, req *fnv1beta1.RunFunctionRequest) (*fnv1beta1.RunFunctionResponse, error) {
    // Iterate over all resources in the request
    for _, obj := range req.GetComposed() {
        // Check if it's an RDS Instance we want to modify
        if obj.GetKind() == "Instance" && obj.GetAPIVersion() == "rds.aws.upbound.io/v1beta1" {
            // Unmarshal the resource into a map or a structured type
            var rdsInstance map[string]interface{}
            if err := json.Unmarshal(obj.GetRaw(), &rdsInstance); err != nil {
                return nil, fmt.Errorf("failed to unmarshal RDS Instance: %w", err)
            }

            // Ensure `spec.forProvider` exists
            spec, ok := rdsInstance["spec"].(map[string]interface{})
            if !ok {
                spec = make(map[string]interface{})
                rdsInstance["spec"] = spec
            }
            forProvider, ok := spec["forProvider"].(map[string]interface{})
            if !ok {
                forProvider = make(map[string]interface{})
                spec["forProvider"] = forProvider
            }

            // Add the managed-by tag
            if forProvider["tags"] == nil {
                forProvider["tags"] = make(map[string]interface{})
            }
            tags, ok := forProvider["tags"].(map[string]interface{})
            if ok {
                tags["managed-by"] = "crossplane"
            }

            // Set backup retention period
            forProvider["backupRetentionPeriod"] = 7

            // Marshal the modified resource back
            modifiedRaw, err := json.Marshal(rdsInstance)
            if err != nil {
                return nil, fmt.Errorf("failed to marshal modified RDS Instance: %w", err)
            }

            // Update the composed resource in the response
            obj.Raw = modifiedRaw
            // Add the modified resource back to the response
            resp.Results = append(resp.Results, &fnv1beta1.Result{
                // ... status and message ...
            })
        }
    }
    return resp, nil
}

This function would then return the modified Instance resource, which Crossplane would then apply to AWS. The spec.forProvider.tags would now include {"managed-by": "crossplane"}, and spec.forProvider.backupRetentionPeriod would be set to 7.

The most surprising thing about Pipeline Functions is their ability to act as a universal adapter for any Kubernetes API, not just Crossplane’s managed resources. You can use them to transform generic Kubernetes objects, inject sidecar containers into Pods, enforce security policies, or even translate between different CRD schemas before they reach their final destination.

The next logical step after mastering resource transformation is to explore how functions can also handle validation and mutation of existing managed resources during the reconciliation loop, not just on initial creation.

Want structured learning?

Take the full Crossplane course →