The Kafka table engine in ClickHouse lets you treat Kafka topics as if they were regular ClickHouse tables, enabling real-time data ingestion and analysis.

Here’s a live example. Imagine you have a Kafka topic named user_events with messages in JSON format, each representing a user action.

{"user_id": 123, "event_type": "click", "timestamp": 1678886400, "page": "/home"}
{"user_id": 456, "event_type": "purchase", "timestamp": 1678886415, "product_id": "A1B2C3D4", "amount": 99.99}

To ingest this into ClickHouse, you’d create a table using the Kafka engine:

CREATE TABLE user_events_kafka (
    user_id UInt64,
    event_type String,
    timestamp DateTime,
    page String,
    product_id Nullable(String),
    amount Nullable(Float64)
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'user_events',
    kafka_group_id = 'clickhouse_consumer_group',
    format = 'JSONEachRow';

Now, you can query user_events_kafka just like any other ClickHouse table. ClickHouse automatically polls Kafka for new messages, deserializes them according to the specified format, and makes them available for querying.

SELECT
    event_type,
    count()
FROM user_events_kafka
WHERE timestamp > now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY count() DESC;

This query will return the count of each event_type that occurred in the last hour, directly from Kafka.

This setup is incredibly powerful because it decouples your data producers from your data consumers. Kafka acts as a buffer, allowing ClickHouse to process data at its own pace without overwhelming it or causing data loss if ClickHouse is temporarily unavailable. The Kafka engine handles the complexities of consumer group management, offset tracking, and message deserialization, presenting a simple, SQL-like interface.

The core problem this solves is the "last mile" problem of real-time data pipelines. Often, data is published to Kafka from various sources (applications, logs, IoT devices). ClickHouse, with its columnar storage and analytical capabilities, is an ideal destination for querying this streaming data. The Kafka engine bridges this gap seamlessly.

Internally, when you query a Kafka table, ClickHouse doesn’t actually store the data persistently in the user_events_kafka table itself. Instead, it acts as a persistent Kafka consumer. It maintains its connection to the Kafka brokers, fetches batches of messages from the specified topics, and processes them on-the-fly for your queries. For true persistence and analytical performance, you’d typically combine this with a materialized view or a separate table that copies data from the Kafka table.

For example, to store this data persistently for faster analytical queries, you’d set up a materialized view:

CREATE MATERIALIZED VIEW user_events_mv TO user_events_persistent
AS
SELECT * FROM user_events_kafka;

Here, user_events_persistent would be a regular ClickHouse table (e.g., MergeTree engine). Every new message arriving in user_events_kafka is automatically inserted into user_events_persistent. This gives you the best of both worlds: real-time ingestion via Kafka and high-performance analytical querying on stored data.

The kafka_skip_broken_messages setting is crucial for production environments. If set to 0 (the default), a single malformed message in a Kafka partition can halt consumption for that partition. Setting it to 1 (or a higher number to tolerate more errors) allows ClickHouse to skip problematic messages and continue processing the rest of the stream, logging the errors for later inspection. This prevents your entire pipeline from stopping due to transient data quality issues.

When dealing with complex nested data structures in JSON, like arrays or nested objects, you’ll need to adapt your ClickHouse table schema. For instance, if a Kafka message contained {"user_id": 123, "addresses": [{"type": "home", "city": "London"}, {"type": "work", "city": "Paris"}]}, you would define the addresses column in ClickHouse as Array(Object('type' String, 'city' String)). ClickHouse’s flexible data types are designed to handle such structures directly from the Kafka message.

The next step you’ll likely encounter is optimizing the ingestion rate and query performance for large volumes of streaming data.

Want structured learning?

Take the full Clickhouse course →