The most surprising thing about handling webhooks is that your primary job isn’t receiving them, it’s surviving them.

Imagine a user signs up for your service. You send an event to Claude. Claude does something, and then sends a webhook back to you. This is a common pattern.

Here’s what a real webhook event might look like. It’s a POST request to an endpoint you’ve registered with Claude.

{
  "type": "content_moderation.denied",
  "data": {
    "id": "msg_abc123",
    "model": "claude-2.1",
    "prompt_tokens": 150,
    "response_tokens": 50,
    "content": [
      {
        "type": "text",
        "text": "This is a denied message."
      }
    ],
    "moderation_results": [
      {
        "input": ["This is a denied message."],
        "flagged_by": "content_policy",
        "level": "high",
        "scored_categories": {
          "sexual": "high",
          "hate_speech": "medium",
          "self_harm": "low"
        },
        "blocked_categories": ["sexual"]
      }
    ]
  },
  "event_id": "evt_xyz789",
  "created_at": 1700000000
}

Your goal is to process this payload. Maybe you log it, maybe you update a user’s status, maybe you trigger an alert. The Claude API sends these events asynchronously. They don’t wait for you to finish processing the last one.

The system needs to be robust. If you’re processing a webhook and your service hiccups, Claude will retry. You need to ensure that retries don’t cause duplicate processing or data corruption.

This is where the event_id becomes critical. It’s a unique identifier for each webhook event. You must use this to ensure idempotency.

Here’s a simplified Python Flask example of an endpoint that receives a webhook:

from flask import Flask, request, jsonify
import requests
import json

app = Flask(__name__)

# In-memory store for processed event IDs (replace with a persistent store in production)
processed_events = set()

@app.route('/claude-webhook', methods=['POST'])
def claude_webhook():
    payload = request.json
    event_id = payload.get('event_id')
    event_type = payload.get('type')
    data = payload.get('data')

    if not event_id:
        return jsonify({"error": "Missing event_id"}), 400

    # Idempotency check
    if event_id in processed_events:
        print(f"Event {event_id} already processed. Skipping.")
        return jsonify({"status": "skipped", "message": "Already processed"}), 200

    print(f"Processing event: {event_id} of type {event_type}")

    try:
        if event_type == "content_moderation.denied":
            handle_moderation_denied(data)
        elif event_type == "message.created":
            handle_message_created(data)
        # Add more event handlers as needed
        else:
            print(f"Unknown event type: {event_type}")

        processed_events.add(event_id) # Mark as processed
        return jsonify({"status": "success"}), 200

    except Exception as e:
        print(f"Error processing event {event_id}: {e}")
        # Return a 5xx error to signal Claude to retry
        return jsonify({"status": "error", "message": str(e)}), 500

def handle_moderation_denied(data):
    message_id = data.get('id')
    print(f"Handling denied message: {message_id}")
    # Your logic here: e.g., log to a dashboard, notify user, etc.
    # Example: Update a database record for the message

def handle_message_created(data):
    message_id = data.get('id')
    print(f"Handling new message: {message_id}")
    # Your logic here

if __name__ == '__main__':
    # In production, use a proper WSGI server like Gunicorn
    app.run(port=5000)

The key takeaway here is the processed_events set. In a real application, this needs to be a persistent datastore (like Redis or a database) so that event IDs aren’t lost if your application restarts. When you receive an event, you check if its event_id is already in your store. If it is, you return a 200 OK status immediately, telling Claude you handled it (even though you’re just skipping it). If it’s not, you process it and then add the event_id to your store.

If an error occurs during processing, you must return a 5xx status code (e.g., 500 Internal Server Error). This signals to Claude that the processing failed and it should retry the webhook later. Claude has built-in retry mechanisms with exponential backoff, so you don’t need to implement that yourself.

The structure of the payload is consistent: a type field indicating the event, and a data field containing the specific details. You’ll map event_type strings to corresponding handler functions.

A subtle but critical point: Claude’s webhooks are delivered via HTTP POST requests. Your endpoint must be publicly accessible on the internet and capable of responding within a reasonable timeout period (typically a few seconds). If your endpoint times out or returns a non-2xx status code, Claude will consider the delivery failed and retry. This is why your processing logic should be fast, and any long-running tasks should be offloaded to background workers (e.g., using Celery, RQ, or AWS SQS).

The next challenge you’ll face is managing the lifecycle of these events, particularly ensuring that critical actions aren’t missed if your processing logic itself has a bug or if a background worker fails.

Want structured learning?

Take the full Claude-api course →