Adding new shards and replicas to a ClickHouse cluster isn’t just about throwing more hardware at the problem; it’s a strategic dance of data redistribution and service discovery that can unlock massive query performance and availability improvements.
Let’s see it in action. Imagine a single-node ClickHouse instance, ch-node-1, humming along. We’ve got a table events partitioned by date.
CREATE TABLE events (
event_date Date,
event_id UUID,
user_id UInt64,
payload String
) ENGINE = MergeTree()
ORDER BY (event_date, event_id);
Now, we want to scale. We’ll add a new shard and a replica for that shard.
First, we need a new ClickHouse server, ch-node-2. We configure its users.xml to allow remote connections and its config.xml to know about the cluster.
In ch-node-1’s config.xml (or a separate metrika.xml for better organization), we add the cluster definition:
<clickhouse>
<remote_servers>
<my_cluster>
<!-- This is shard 0 -->
<shard>
<replica>
<host>ch-node-1</host>
<port>9000</port>
</replica>
</shard>
<!-- This is shard 1 -->
<shard>
<replica>
<host>ch-node-2</host>
<port>9000</port>
</replica>
</shard>
</my_cluster>
</remote_servers>
</clickhouse>
Restart ch-node-1 and ch-node-2 for these changes to take effect.
Now, we need to tell ClickHouse how to distribute data across these shards. This is done via the sharding_key in the table definition. If we don’t have one, ClickHouse defaults to distributing based on a UInt64 hash of the entire row, which isn’t ideal for time-series data. A common sharding key for events might be cityHash64(user_id) or intHash64(event_date). Let’s assume we want to shard by user_id.
We can’t directly alter a MergeTree table’s sharding key. Instead, we create a new table with the desired sharding and replication, then migrate data.
On ch-node-1, create the new distributed table definition pointing to both shards:
CREATE TABLE events_distributed (
event_date Date,
event_id UUID,
user_id UInt64,
payload String
) ENGINE = Distributed(my_cluster, default, events, cityHash64(user_id));
This Distributed engine doesn’t store data itself. It’s a facade that routes queries to the appropriate shard(s) based on the cityHash64(user_id) sharding key. The default is the database, events is the local table on each shard, and cityHash64(user_id) is the expression ClickHouse uses to decide which shard a row belongs to.
To get data into this distributed table, we can insert new data directly into events_distributed. For existing data, we’ll use INSERT INTO events_distributed SELECT * FROM events;. This command will run on ch-node-1. ClickHouse will read data from the local events table, compute cityHash64(user_id) for each row, and route it to the correct shard (ch-node-1 or ch-node-2).
The real magic happens here: ClickHouse automatically handles the data movement. As data is inserted into events_distributed, it’s written to the events table on the correct shard. If we add more replicas to a shard, ClickHouse will also ensure data consistency between those replicas.
To add a replica to ch-node-2 for shard 1, we’d add another <replica> block to the <shard> section for shard 1 in config.xml on ch-node-1:
<shard>
<replica>
<host>ch-node-2</host>
<port>9000</port>
</replica>
<replica>
<host>ch-node-2-replica</host> <!-- New replica host -->
<port>9000</port>
</replica>
</shard>
Then, on ch-node-2-replica, configure its config.xml to be part of my_cluster as well, and restart. Any new data inserted into events_distributed will be written to both replicas of shard 1. For existing data on ch-node-2, ClickHouse will asynchronously copy it to ch-node-2-replica using background merge processes.
The mental model is that the Distributed engine is the single point of access. It knows about all shards and replicas via the remote_servers configuration. When you query events_distributed, ClickHouse’s query planner analyzes the query. If the query involves the sharding key (e.g., WHERE user_id = 123), it can route the query directly to the shard responsible for that user_id. If the query requires data from multiple shards (e.g., SELECT count() FROM events_distributed), it sends the query to all relevant shards, then aggregates the results. Replicas allow ClickHouse to distribute read load and provide fault tolerance; it can pick any available replica for a shard to execute a query.
Most people assume that when you add a new shard or replica, you have to manually move data. But ClickHouse’s Distributed engine and background processes handle this for you transparently. The key is defining the sharding_key correctly for your access patterns and ensuring your remote_servers configuration accurately reflects your cluster topology.
The next hurdle you’ll likely face is optimizing queries that span many shards, leading to increased latency.