FastAPI can’t directly "connect" to Kafka in the way a database driver connects to a database; instead, it acts as a producer or consumer of Kafka messages.

Here’s FastAPI producing events to Kafka:

from fastapi import FastAPI
from kafka import KafkaProducer
import json

app = FastAPI()

# Configure Kafka Producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # Replace with your Kafka broker address
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/publish_event")
async def publish_event(event_data: dict):
    """Publishes an event to a Kafka topic."""
    try:
        future = producer.send('my_event_topic', value=event_data)
        # Block until the message is sent or timeout
        record_metadata = future.get(timeout=10)
        print(f"Successfully sent message to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
        return {"message": "Event published successfully", "metadata": str(record_metadata)}
    except Exception as e:
        print(f"Error publishing event: {e}")
        return {"message": f"Error publishing event: {e}"}, 500

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

And here’s FastAPI consuming events from Kafka:

from fastapi import FastAPI
from kafka import KafkaConsumer
import json
import threading

app = FastAPI()

# Configure Kafka Consumer
consumer = KafkaConsumer(
    'my_event_topic',  # Topic to subscribe to
    bootstrap_servers='localhost:9092',  # Replace with your Kafka broker address
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my_fastapi_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

def consume_messages():
    """Consumes messages from Kafka and processes them."""
    for message in consumer:
        event_data = message.value
        print(f"Received message: {event_data} from topic {message.topic} partition {message.partition} offset {message.offset}")
        # Here you would typically process the event_data
        # For example, update a database, call another service, etc.
        # For demonstration, we just print it.

# Start the consumer in a separate thread so it doesn't block the FastAPI app
consumer_thread = threading.Thread(target=consume_messages)
consumer_thread.daemon = True  # Allows the main program to exit even if this thread is running
consumer_thread.start()

@app.get("/")
async def read_root():
    return {"message": "FastAPI is running and consuming Kafka events."}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

The core idea is that FastAPI is an HTTP framework, not a direct Kafka client library. You use a Python Kafka client library (like kafka-python) within your FastAPI application to interact with Kafka. This allows your web service to either send data to Kafka (as an event producer) or listen for data from Kafka (as an event consumer).

The producer code uses KafkaProducer to send JSON-serialized messages to a specified topic (my_event_topic). The value_serializer ensures that Python dictionaries are converted to JSON strings and then encoded into bytes, which is what Kafka expects. The future.get(timeout=10) is crucial for ensuring the message was actually sent before the API call returns, providing immediate feedback on success or failure.

The consumer code uses KafkaConsumer to subscribe to a topic. auto_offset_reset='earliest' means it will start reading from the beginning of the topic if no previous offset is committed for its group_id. enable_auto_commit=True means the consumer will periodically commit its progress (which messages it has processed) back to Kafka, so it doesn’t re-process messages if it restarts. The consume_messages function runs in an infinite loop, blocking until a message is available. Because this loop would freeze the FastAPI web server, it’s run in a separate threading.Thread.

The real power comes when you combine these. Your FastAPI app might expose an endpoint to receive data from a user, and then immediately publish that data as an event to Kafka. Another microservice, perhaps also built with FastAPI or a different framework, could be consuming that event from Kafka and performing an action based on it. This decouples the services; the first service doesn’t need to know or care about the second service’s existence or implementation details, only that the event is published to a known Kafka topic.

The most surprising thing is that FastAPI, an HTTP-focused framework, becomes a natural participant in an event-driven architecture by simply embedding a Kafka client. It doesn’t need specialized Kafka integrations built into its core; it leverages standard Python libraries.

When you’re consuming messages, the consumer.poll() method is often more flexible than the iterator approach shown above, especially if you need to handle different topics or partitions with varying logic, or if you want finer control over commit intervals and error handling.

The next concept you’ll likely explore is how to handle Kafka consumer failures and ensure exactly-once processing semantics, which is significantly more complex and often involves external systems or careful application-level logic.

Want structured learning?

Take the full Fastapi course →