The ClickHouse distributed table engine doesn’t actually move data; it just tells one ClickHouse node how to ask other ClickHouse nodes for data.
Let’s see it in action. Imagine you have two ClickHouse servers, ch1.example.com and ch2.example.com. On ch1, we’ll create a local table and insert some data:
-- On ch1.example.com
CREATE TABLE local_events (
event_date Date,
event_type String,
value UInt64
) ENGINE = MergeTree()
ORDER BY (event_date, event_type);
INSERT INTO local_events VALUES ('2023-10-26', 'login', 100);
INSERT INTO local_events VALUES ('2023-10-26', 'logout', 50);
INSERT INTO local_events VALUES ('2023-10-27', 'login', 120);
Now, on ch2, we’ll create a similar local table, but with different data:
-- On ch2.example.com
CREATE TABLE local_events (
event_date Date,
event_type String,
value UInt64
) ENGINE = MergeTree()
ORDER BY (event_date, event_type);
INSERT INTO local_events VALUES ('2023-10-26', 'purchase', 200);
INSERT INTO local_events VALUES ('2023-10-27', 'purchase', 250);
INSERT INTO local_events VALUES ('2023-10-27', 'logout', 75);
To query across both, we first define a "cluster" in ClickHouse’s configuration. This tells ClickHouse about the other nodes. On both ch1 and ch2, add this to /etc/clickhouse-server/metrika.xml (or your equivalent config file) within the <clickhouse> section:
<remote_servers>
<my_cluster>
<shard>
<replica>
<host>ch1.example.com</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>ch2.example.com</host>
<port>9000</port>
</replica>
</shard>
</my_cluster>
</remote_servers>
After restarting ClickHouse on both nodes, we can create the distributed table on one of them (say, ch1). This table doesn’t store data itself; it’s a logical layer.
-- On ch1.example.com
CREATE TABLE distributed_events (
event_date Date,
event_type String,
value UInt64
) ENGINE = Distributed(my_cluster, default, local_events, rand());
The Distributed engine takes:
- The cluster name (
my_cluster). - The database name on the remote nodes (
default). - The table name on the remote nodes (
local_events). - A sharding key expression (
rand()in this case, meaning data is arbitrarily distributed).
Now, when you query distributed_events on ch1, ClickHouse consults the my_cluster definition. It sees ch1 and ch2, and for each shard, it knows which replicas to hit. If the sharding key is rand(), it will send the query to all shards.
-- On ch1.example.com
SELECT * FROM distributed_events WHERE event_date = '2023-10-26';
This query will be executed on both ch1 (against its local_events) and ch2 (against its local_events). ClickHouse then aggregates the results from each node and returns them as a single result set. You’ll see all five rows from our earlier inserts.
The core problem this solves is scaling query performance and storage beyond a single machine. By distributing data and query processing across multiple nodes, you can handle larger datasets and higher query loads. The Distributed engine acts as a transparent query router and aggregator. You define the cluster topology, and ClickHouse handles sending queries to the correct nodes and merging the results.
The sharding key in the Distributed engine definition is crucial. If you used event_date as the sharding key, like Distributed(my_cluster, default, local_events, event_date), then when you insert data, ClickHouse would calculate a hash of the event_date and send the row only to the shard responsible for that hash. This is how you partition data. For example, if ch1 handles dates with hashes 0-500 and ch2 handles 501-1000, a login event on 2023-10-26 (which might hash to 300) would only be inserted into ch1’s local_events table. Queries for that specific date would then only need to hit ch1, making them much faster.
The rand() sharding key is generally used when you have a "gather all data" scenario or when you’re not sure how to shard a particular table effectively. However, for analytical workloads where you often filter by time or a specific dimension, a deterministic sharding key based on that dimension is far more performant.
What most people miss is that the Distributed table engine itself does not have any data. It’s purely a configuration and query routing layer. All the actual data resides in the MergeTree (or other engine) tables on the individual nodes defined within the cluster. When you INSERT into a Distributed table, ClickHouse determines which shard(s) to send the data to based on the sharding key and then forwards the data to the appropriate local_ tables on those nodes. If you query a Distributed table, ClickHouse sends the query to the appropriate shard(s) and collects the results.
The next step is usually setting up replication for fault tolerance and high availability of your distributed setup.