Falco is surprisingly good at preventing alerts from becoming full-blown incidents.

Let’s say a suspicious shell process spawns from a web server. Instead of just an alert, we want Falco to kick off a chain reaction: isolate the pod, grab its logs, and alert the on-call engineer.

Here’s a common setup:

The Goal: Automatically respond to critical Falco alerts.

The Components:

  1. Falco: The security engine, detecting suspicious activity.
  2. Kubernetes: Where our applications run.
  3. falco-rules (or custom rules): Defines what’s suspicious.
  4. falco-output (e.g., webhook, Kafka): How Falco sends alerts.
  5. An automation tool: This is the orchestrator. Common choices include:
    • kubectl + shell scripts: Simple, good for basic actions.
    • Argo Workflows / Tekton: Kubernetes-native, robust for complex pipelines.
    • Serverless functions (AWS Lambda, Google Cloud Functions): Event-driven, good for reacting to webhooks.
    • Dedicated SOAR platforms (Splunk SOAR, etc.): More feature-rich, often overkill for this specific task.

We’ll focus on a kubectl + shell script approach for clarity, as it demonstrates the core concepts.

Falco Alerting:

First, we need Falco to send alerts in a machine-readable format. The webhook output is a good candidate.

In your falco.yaml (or equivalent configuration):

output:
  format: json
  outputs:
    - webhook:
        url: "http://your-alert-receiver.default.svc.cluster.local:8080/falco"
        # Add authentication headers if your receiver requires them
        # headers:
        #   Authorization: "Bearer your_secret_token"

This tells Falco to send alerts as JSON to a service running on port 8080. This service will be our automation trigger.

The Alert Receiver & Automation Trigger:

We need a simple service that listens for Falco’s webhook and then invokes our automation logic. A basic Python Flask app is perfect for this.

from flask import Flask, request, jsonify
import subprocess
import json
import os

app = Flask(__name__)

@app.route('/falco', methods=['POST'])
def handle_falco_alert():
    alert_data = request.get_json()
    print(f"Received Falco alert: {json.dumps(alert_data, indent=2)}")

    # Example: Trigger isolation for critical "reverse shell" alerts
    if alert_data.get('rule') == 'Reverse Shell' and alert_data.get('priority') == 'critical':
        container_id = alert_data.get('output_fields', {}).get('container.id')
        k8s_pod_name = alert_data.get('output_fields', {}).get('k8s.pod.name')
        k8s_namespace = alert_data.get('output_fields', {}).get('k8s.namespace.name')

        if container_id and k8s_pod_name and k8s_namespace:
            print(f"Attempting to isolate pod {k8s_pod_name} in namespace {k8s_namespace}")
            # In a real scenario, you'd call a more robust script
            # For demonstration, we'll just print the commands
            isolate_command = f"kubectl exec -n {k8s_namespace} {k8s_pod_name} -- iptables -A INPUT -j REJECT"
            print(f"Would run: {isolate_command}")
            # subprocess.run(isolate_command, shell=True, check=True)
            # You'd also trigger log collection and notifications here
            return jsonify({"message": f"Isolation initiated for {k8s_pod_name}"}), 200
        else:
            print("Could not extract necessary fields for isolation.")
            return jsonify({"message": "Missing fields for isolation"}), 400

    return jsonify({"message": "Alert received, no automated action taken"}), 200

if __name__ == '__main__':
    # In production, use a proper WSGI server like Gunicorn
    app.run(host='0.0.0.0', port=8080)

This Flask app listens on /falco. When it receives a POST request, it parses the JSON. If the alert matches a specific rule and priority (e.g., "Reverse Shell" with "critical" priority), it extracts Kubernetes pod information and would execute an iptables command to block network traffic to/from that pod.

Deployment:

  1. Deploy the Flask app: Package it as a Docker image and deploy it as a Kubernetes Deployment and Service. Ensure the service name (your-alert-receiver.default.svc.cluster.local in the Falco config) matches your deployment.
  2. Grant Permissions: The Service Account used by the Flask app’s pod needs kubectl permissions to interact with the Kubernetes API (e.g., to execute commands within pods). This typically involves a ClusterRole and ClusterRoleBinding.

The "Isolation" Step (iptables -A INPUT -j REJECT):

This is a very basic example. In a real-world scenario, you’d likely want to:

  • Network Policies: Use Kubernetes Network Policies to deny traffic. This is more idiomatic and manageable than iptables directly.
  • Graceful Shutdown: Instead of blocking, perhaps trigger a scaledown of the affected deployment, or drain the pod.
  • Sidecar Injection: If you have a service mesh (like Istio), you might trigger a policy change there.

The "Why It Works Mechanically":

Falco, by default, just emits events. By configuring its output to a webhook, we transform these events into network requests. The Flask application acts as a simple HTTP server that listens for these requests. When a specific request arrives (based on the alert’s content), it triggers a Python function. This function then uses subprocess.run to execute kubectl commands. kubectl itself communicates with the Kubernetes API server, which then instructs the kubelet on the target node to execute the command inside the specified pod.

The One Thing Most People Don’t Know:

The real power isn’t just in executing commands, but in chaining them. For instance, after isolating a pod, you might want to trigger a separate workflow to:

  1. Take a snapshot of the pod’s filesystem.
  2. Dump its current memory.
  3. Collect all logs from the pod and its node.
  4. Create a detailed incident ticket with all this data.

This requires a more sophisticated automation engine than a simple Flask app, but the principle of using Falco’s output as a trigger remains the same. You’re essentially building a reactive security system where detection directly leads to automated remediation or investigation steps.

Next Step:

The next error you’ll hit is realizing that simply rejecting all input with iptables might break legitimate health checks or internal service-to-service communication if not carefully managed.

Want structured learning?

Take the full Falco course →