ClickHouse’s sharding and replication architecture is designed for extreme performance and availability, but understanding how it all fits together can feel like assembling a puzzle with pieces that move.
Let’s watch a query, specifically a SELECT on a sharded and replicated table, execute.
-- Assume a table 'events' sharded by 'event_date' and replicated across 3 replicas.
-- Querying for events in the last 7 days.
SELECT
count(*)
FROM events
WHERE event_date >= today() - INTERVAL 7 DAY;
When you execute this query, the ClickHouse client connects to one of the replicas (let’s call it Replica A) that’s part of the cluster. This initial replica acts as the query initiator. It doesn’t hold all the data; instead, it knows how the events table is sharded and replicated across the cluster thanks to the system.clusters and system.replication_queue tables.
The initiator replica analyzes the query and the table’s sharding key (event_date). It determines which shards are likely to contain the relevant data for event_date >= today() - INTERVAL 7 DAY. For each of those shards, it identifies all the replicas that are currently active and healthy for that specific shard.
Then, it sends the same query to one replica on each of the relevant shards. So, if event_date is sharded across 4 shards, and the query targets data that exists on shards 1, 2, and 3, the initiator replica will send the query to a replica on shard 1, a replica on shard 2, and a replica on shard 3. It doesn’t send it to all replicas for a shard, just one.
These "worker" replicas, each on their respective shard, execute their portion of the query against their local data. They perform the WHERE clause filtering and the COUNT(*) aggregation locally.
Once each worker replica finishes its part, it sends its result back to the initiator replica. The initiator then aggregates these partial results. In our COUNT(*) example, it sums up the counts received from each worker. The final, combined result is then returned to the client. The client never knows or cares which specific replicas did the work, only that it got a result.
This distributed query execution is where ClickHouse gains its speed. Instead of one massive server scanning all the data, the work is parallelized across many servers, each only responsible for a subset of the data (its shard). Replication ensures that if one server fails, another replica for that shard can pick up the slack.
The core components you configure are macros for cluster naming, remote_servers for defining cluster connections and shard mappings, and replication_queue settings for managing data synchronization.
For instance, in your ClickHouse configuration (config.xml or users.xml), you’ll define something like this:
<clickhouse>
<macros>
<shard>01</shard>
<replica>replica_1</replica>
</macros>
<remote_servers>
<my_cluster>
<shard>
<!-- Shard 01 -->
<replica>
<host>clickhouse-shard01-replica01</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-shard01-replica02</host>
<port>9000</port>
</replica>
</shard>
<shard>
<!-- Shard 02 -->
<replica>
<host>clickhouse-shard02-replica01</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-shard02-replica02</host>
<port>9000</port>
</replica>
</shard>
</my_cluster>
</remote_servers>
<!-- ... other settings ... -->
</clickhouse>
Here, my_cluster defines two shards. Each shard has two replicas. The macros are per-server configurations, so clickhouse-shard01-replica01 would have <shard>01</shard> and <replica>replica_1</replica>, while clickhouse-shard01-replica02 would have <shard>01</shard> and <replica>replica_2</replica>. This allows ClickHouse to know which shard and replica this specific server belongs to. The remote_servers section tells all servers in the cluster about the topology of my_cluster.
When a table is created with ENGINE = Distributed('my_cluster', 'default', 'events', sharding_key), ClickHouse knows to use the my_cluster definition. The sharding_key determines how rows are distributed across shards. The Distributed engine itself doesn’t store data; it’s a facade that directs queries to the appropriate shards.
The replication aspect is handled by the ZooKeeper integration. When you specify REPLICATION_QUORUM=N or REPLICATION_WAIT_FOR_QUORUM in INSERT statements, ClickHouse uses ZooKeeper to coordinate writes. A write to a replicated table is sent to one replica. That replica writes the data locally and then uses ZooKeeper to notify other replicas that new data is available. For INSERTs, ClickHouse can wait for a quorum of replicas to acknowledge the write before confirming success to the client, ensuring durability.
The system.replication_queue table is crucial for monitoring. It shows you pending replication tasks for each shard, indicating if data is being pushed out to other replicas. If this queue grows, it’s a sign of replication lag.
The Distributed engine’s sharding key is what determines data locality. If your sharding key doesn’t distribute data evenly, you can end up with "hot shards" that are overloaded.
A critical, often overlooked, detail is that the Distributed table engine itself doesn’t store data. It’s a query router. The actual data resides in tables using engines like MergeTree (or its variants) on each physical shard. When you INSERT into a Distributed table, ClickHouse, in coordination with ZooKeeper (if replicated), decides which physical shard(s) the data belongs to and inserts it into the corresponding MergeTree table on a replica for that shard.
The next hurdle you’ll likely face is optimizing query performance on sharded tables, especially when joins or aggregations span multiple shards.