Adding new shards and replicas to a ClickHouse cluster isn’t just about throwing more hardware at the problem; it’s a strategic dance of data redistribution and service discovery that can unlock massive query performance and availability improvements.

Let’s see it in action. Imagine a single-node ClickHouse instance, ch-node-1, humming along. We’ve got a table events partitioned by date.

CREATE TABLE events (
    event_date Date,
    event_id UUID,
    user_id UInt64,
    payload String
) ENGINE = MergeTree()
ORDER BY (event_date, event_id);

Now, we want to scale. We’ll add a new shard and a replica for that shard.

First, we need a new ClickHouse server, ch-node-2. We configure its users.xml to allow remote connections and its config.xml to know about the cluster.

In ch-node-1’s config.xml (or a separate metrika.xml for better organization), we add the cluster definition:

<clickhouse>
    <remote_servers>
        <my_cluster>
            <!-- This is shard 0 -->
            <shard>
                <replica>
                    <host>ch-node-1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <!-- This is shard 1 -->
            <shard>
                <replica>
                    <host>ch-node-2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </my_cluster>
    </remote_servers>
</clickhouse>

Restart ch-node-1 and ch-node-2 for these changes to take effect.

Now, we need to tell ClickHouse how to distribute data across these shards. This is done via the sharding_key in the table definition. If we don’t have one, ClickHouse defaults to distributing based on a UInt64 hash of the entire row, which isn’t ideal for time-series data. A common sharding key for events might be cityHash64(user_id) or intHash64(event_date). Let’s assume we want to shard by user_id.

We can’t directly alter a MergeTree table’s sharding key. Instead, we create a new table with the desired sharding and replication, then migrate data.

On ch-node-1, create the new distributed table definition pointing to both shards:

CREATE TABLE events_distributed (
    event_date Date,
    event_id UUID,
    user_id UInt64,
    payload String
) ENGINE = Distributed(my_cluster, default, events, cityHash64(user_id));

This Distributed engine doesn’t store data itself. It’s a facade that routes queries to the appropriate shard(s) based on the cityHash64(user_id) sharding key. The default is the database, events is the local table on each shard, and cityHash64(user_id) is the expression ClickHouse uses to decide which shard a row belongs to.

To get data into this distributed table, we can insert new data directly into events_distributed. For existing data, we’ll use INSERT INTO events_distributed SELECT * FROM events;. This command will run on ch-node-1. ClickHouse will read data from the local events table, compute cityHash64(user_id) for each row, and route it to the correct shard (ch-node-1 or ch-node-2).

The real magic happens here: ClickHouse automatically handles the data movement. As data is inserted into events_distributed, it’s written to the events table on the correct shard. If we add more replicas to a shard, ClickHouse will also ensure data consistency between those replicas.

To add a replica to ch-node-2 for shard 1, we’d add another <replica> block to the <shard> section for shard 1 in config.xml on ch-node-1:

<shard>
    <replica>
        <host>ch-node-2</host>
        <port>9000</port>
    </replica>
    <replica>
        <host>ch-node-2-replica</host> <!-- New replica host -->
        <port>9000</port>
    </replica>
</shard>

Then, on ch-node-2-replica, configure its config.xml to be part of my_cluster as well, and restart. Any new data inserted into events_distributed will be written to both replicas of shard 1. For existing data on ch-node-2, ClickHouse will asynchronously copy it to ch-node-2-replica using background merge processes.

The mental model is that the Distributed engine is the single point of access. It knows about all shards and replicas via the remote_servers configuration. When you query events_distributed, ClickHouse’s query planner analyzes the query. If the query involves the sharding key (e.g., WHERE user_id = 123), it can route the query directly to the shard responsible for that user_id. If the query requires data from multiple shards (e.g., SELECT count() FROM events_distributed), it sends the query to all relevant shards, then aggregates the results. Replicas allow ClickHouse to distribute read load and provide fault tolerance; it can pick any available replica for a shard to execute a query.

Most people assume that when you add a new shard or replica, you have to manually move data. But ClickHouse’s Distributed engine and background processes handle this for you transparently. The key is defining the sharding_key correctly for your access patterns and ensuring your remote_servers configuration accurately reflects your cluster topology.

The next hurdle you’ll likely face is optimizing queries that span many shards, leading to increased latency.

Want structured learning?

Take the full Clickhouse course →