Crossplane Pipeline Functions let you manipulate Kubernetes Custom Resources (CRs) as they flow through Crossplane’s control loops, enabling powerful customization and automation.
Imagine you’ve just declared a new AWS RDS instance using Crossplane:
apiVersion: rds.aws.upbound.io/v1beta1
kind: Instance
metadata:
name: my-database
spec:
forProvider:
region: us-east-1
dbInstanceClass: db.t3.micro
engine: postgres
engineVersion: "13.7"
By default, Crossplane will try to create this RDS instance. But what if you want to enforce a specific tagging strategy across all your databases, or inject a default backup retention period that users might forget? This is where Pipeline Functions come in.
Pipeline Functions are essentially small programs that intercept and modify Crossplane resources before they are applied to the cloud provider or before they are reconciled. They operate within the Crossplane control plane, allowing you to build complex, declarative workflows for resource management.
Here’s how it looks in action. Let’s say we want to automatically add a managed-by tag and set a backupRetentionPeriod to 7 days for all RDS instances.
First, we define a Function Custom Resource in Crossplane that points to our function image. This image contains our custom logic.
apiVersion: pkg.crossplane.io/v1
kind: Function
metadata:
name: rds-enhancer
spec:
package: xuanquanz/rds-enhancer:v0.1.0 # Our custom function image
Next, we define a FunctionPipeline that specifies the order of functions to execute. In this case, we have one function.
apiVersion: pipelines.crossplane.io/v1
kind: FunctionPipeline
metadata:
name: default-rds-pipeline
spec:
pipeline:
- functionRef:
name: rds-enhancer
Now, we associate this pipeline with our Instance resource. This is done by adding an annotation to the Instance’s metadata:
apiVersion: rds.aws.upbound.io/v1beta1
kind: Instance
metadata:
name: my-database
annotations:
crossplane.io/function-pipeline: default-rds-pipeline # Reference our pipeline
spec:
forProvider:
region: us-east-1
dbInstanceClass: db.t3.micro
engine: postgres
engineVersion: "13.7"
When Crossplane reconciles this Instance, it will first send the Instance resource to the rds-enhancer function. Our function will then modify the resource.
The core concept is that a function receives a Composite object (which represents the desired state from the user) and a Managed object (which represents the current state of the resource on the cloud provider). It can then transform the Composite into one or more Managed objects that will be applied.
Our rds-enhancer function, written in Go (or any language that can produce a gRPC server), would look something like this conceptually:
// Inside the function's main logic
func (s *server) RunFunction(ctx context.Context, req *fnv1beta1.RunFunctionRequest) (*fnv1beta1.RunFunctionResponse, error) {
// Iterate over all resources in the request
for _, obj := range req.GetComposed() {
// Check if it's an RDS Instance we want to modify
if obj.GetKind() == "Instance" && obj.GetAPIVersion() == "rds.aws.upbound.io/v1beta1" {
// Unmarshal the resource into a map or a structured type
var rdsInstance map[string]interface{}
if err := json.Unmarshal(obj.GetRaw(), &rdsInstance); err != nil {
return nil, fmt.Errorf("failed to unmarshal RDS Instance: %w", err)
}
// Ensure `spec.forProvider` exists
spec, ok := rdsInstance["spec"].(map[string]interface{})
if !ok {
spec = make(map[string]interface{})
rdsInstance["spec"] = spec
}
forProvider, ok := spec["forProvider"].(map[string]interface{})
if !ok {
forProvider = make(map[string]interface{})
spec["forProvider"] = forProvider
}
// Add the managed-by tag
if forProvider["tags"] == nil {
forProvider["tags"] = make(map[string]interface{})
}
tags, ok := forProvider["tags"].(map[string]interface{})
if ok {
tags["managed-by"] = "crossplane"
}
// Set backup retention period
forProvider["backupRetentionPeriod"] = 7
// Marshal the modified resource back
modifiedRaw, err := json.Marshal(rdsInstance)
if err != nil {
return nil, fmt.Errorf("failed to marshal modified RDS Instance: %w", err)
}
// Update the composed resource in the response
obj.Raw = modifiedRaw
// Add the modified resource back to the response
resp.Results = append(resp.Results, &fnv1beta1.Result{
// ... status and message ...
})
}
}
return resp, nil
}
This function would then return the modified Instance resource, which Crossplane would then apply to AWS. The spec.forProvider.tags would now include {"managed-by": "crossplane"}, and spec.forProvider.backupRetentionPeriod would be set to 7.
The most surprising thing about Pipeline Functions is their ability to act as a universal adapter for any Kubernetes API, not just Crossplane’s managed resources. You can use them to transform generic Kubernetes objects, inject sidecar containers into Pods, enforce security policies, or even translate between different CRD schemas before they reach their final destination.
The next logical step after mastering resource transformation is to explore how functions can also handle validation and mutation of existing managed resources during the reconciliation loop, not just on initial creation.