Kafka is actually a distributed commit log, not a message queue, and its real power lies in its ability to replay events and act as a source of truth for state.
Imagine you have a simple Express API that handles user signups. When a new user signs up, you want to do a few things: send a welcome email, update a user analytics dashboard, and maybe even trigger a personalized onboarding flow. Doing all of this synchronously within the API request would make signups slow and brittle – if the email service is down, the user signup fails. This is where Kafka shines.
Here’s how we can wire it up.
First, let’s set up a basic Express API.
// server.js
const express = require('express');
const { Kafka } = require('kafkajs');
const app = express();
app.use(express.json());
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // Replace with your Kafka broker address
});
const producer = kafka.producer();
const runProducer = async () => {
await producer.connect();
console.log('Kafka producer connected');
};
runProducer();
app.post('/signup', async (req, res) => {
const { email, username } = req.body;
if (!email || !username) {
return res.status(400).send('Email and username are required.');
}
const newUserEvent = {
email: email,
username: username,
timestamp: new Date().toISOString(),
};
try {
await producer.send({
topic: 'user-signup-events', // The Kafka topic to send the event to
messages: [
{ value: JSON.stringify(newUserEvent) },
],
});
console.log('User signup event sent to Kafka:', newUserEvent);
res.status(201).send('User signup initiated.');
} catch (error) {
console.error('Error sending to Kafka:', error);
res.status(500).send('Failed to initiate user signup process.');
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Express server running on port ${PORT}`);
});
Now, let’s create a separate Kafka consumer service that will listen to the user-signup-events topic and react to these events.
// consumer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // Replace with your Kafka broker address
});
const consumer = kafka.consumer({ groupId: 'user-signup-processor' });
const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'user-signup-events', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log(`Received event: ${JSON.stringify(event)} from topic ${topic}, partition ${partition}`);
// Simulate sending a welcome email
console.log(`Sending welcome email to ${event.email}...`);
// In a real app, you'd call an email service here.
// Simulate updating analytics
console.log(`Updating analytics for user ${event.username}...`);
// In a real app, you'd update a database or analytics system.
// Simulate triggering onboarding
console.log(`Triggering onboarding for ${event.username}...`);
// In a real app, you'd call another service or publish another event.
},
});
};
runConsumer().catch(console.error);
To run this, you’ll need a Kafka cluster running (e.g., using Docker).
-
Start Kafka:
docker-compose up -d(Assuming a
docker-compose.ymlfile for Kafka and Zookeeper) -
Install dependencies:
npm install express kafkajs -
Run the Express API:
node server.js -
Run the Kafka Consumer:
node consumer.js
Now, if you send a POST request to http://localhost:3000/signup with a JSON body like:
{
"email": "test@example.com",
"username": "tester123"
}
You’ll see the Express server log that the event was sent to Kafka, and then the consumer service will log that it received the event and is processing it.
The real magic here is that the Express API is now decoupled from the downstream processing. If the consumer.js service (or the email service, analytics service, etc.) goes down, the signup request still succeeds because the event is safely stored in Kafka. The consumer can then pick up where it left off, even replaying past events if needed by setting fromBeginning: true in the consumer.subscribe method (though be careful with this in production for already processed events). This makes your system more resilient and scalable.
The groupId in the consumer is crucial. Kafka ensures that each message published to a topic is delivered to only one consumer within a given consumer group. If you had multiple instances of consumer.js running with the same groupId, Kafka would distribute the messages across them, enabling parallel processing. If you started another consumer with a different groupId, it would receive all the messages again, allowing for entirely separate processing paths for the same event.
The most surprising thing about Kafka’s design is how it treats topics as ordered, immutable logs. This means that the order of messages within a partition is guaranteed, but not necessarily across partitions. If you need strict ordering for all events of a certain type (e.g., financial transactions), you’d typically use a single partition for that topic by setting numPartitions: 1 when creating the topic and ensuring your producer sends all related messages to that partition (often by using a consistent key in the producer.send method).
If you wanted to build a more complex system, you might have another service that consumes user-signup-events, but instead of performing actions, it transforms the data and publishes it to a new topic, say user-registered-for-onboarding. This forms event streams, where one event triggers another, building a chain of asynchronous operations.
The next concept you’ll likely encounter is how to handle failures and ensure exactly-once processing semantics in a distributed system like this, which involves understanding idempotency and transactional producers/consumers.