-
Notifications
You must be signed in to change notification settings - Fork 0
Clickhouse resharding example
Danila Ganchar edited this page Nov 6, 2024
·
1 revision
Let's say we have large tables and we have prepared a cluster(2 shards + 2 replicas). We need to migrate data.
docker-compose.yml
version: '3.5'
services:
zookeeper:
image: zookeeper:3.7
container_name: zookeeper
hostname: zookeeper
networks:
clickhouse-network:
ipv4_address: 172.20.2.0
clickhouse01:
image: clickhouse/clickhouse-server:23.4.2.11-alpine
container_name: nostromo0
hostname: nostromo0
networks:
clickhouse-network:
ipv4_address: 172.20.2.1
ports:
- "127.0.0.1:8123:8123"
- "127.0.0.1:9000:9000"
volumes:
- ${PWD}/nostromo0:/etc/clickhouse-server
depends_on:
- zookeeper
clickhouse02:
image: clickhouse/clickhouse-server:23.4.2.11-alpine
container_name: nostromo1
hostname: nostromo1
networks:
clickhouse-network:
ipv4_address: 172.20.2.2
volumes:
- ${PWD}/nostromo1:/etc/clickhouse-server
depends_on:
- zookeeper
clickhouse03:
image: clickhouse/clickhouse-server:23.4.2.11-alpine
container_name: jones0
hostname: jones0
networks:
clickhouse-network:
ipv4_address: 172.20.2.3
volumes:
- ${PWD}/jones0:/etc/clickhouse-server
depends_on:
- zookeeper
clickhouse04:
image: clickhouse/clickhouse-server:23.4.2.11-alpine
container_name: jones1
hostname: jones1
networks:
clickhouse-network:
ipv4_address: 172.20.2.4
volumes:
- ${PWD}/jones1:/etc/clickhouse-server
depends_on:
- zookeeper
networks:
clickhouse-network:
name: clickhouse-network
ipam:
config:
- subnet: 172.20.0.0/16
users.xml
<?xml version="1.0"?>
<company>
<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>0</use_uncompressed_cache>
<load_balancing>in_order</load_balancing>
<log_queries>1</log_queries>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<networks>
<ip>::/0</ip>
</networks>
<quota>default</quota>
</default>
<admin>
<password>123</password>
<profile>default</profile>
<networks>
<ip>::/0</ip>
</networks>
<quota>default</quota>
</admin>
</users>
<quotas>
<default>
<interval>
<duration>3600</duration>
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</company>
config.xml
<?xml version="1.0"?>
<company>
<logger>
<level>debug</level>
<console>true</console>
<log remove="remove"/>
<errorlog remove="remove"/>
</logger>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<interserver_http_host>${REPLICA}</interserver_http_host>
<interserver_http_port>9009</interserver_http_port>
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<max_concurrent_queries>100</max_concurrent_queries>
<uncompressed_cache_size>8589934592</uncompressed_cache_size>
<mark_cache_size>5368709120</mark_cache_size>
<path>/var/lib/clickhouse/</path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<users_config>users.xml</users_config>
<default_profile>default</default_profile>
<default_database>default</default_database>
<timezone>Europe/Warsaw</timezone>
<mlock_executable>false</mlock_executable>
<remote_servers>
<ripley>
<shard>
<replica>
<host>nostromo0</host>
<port>9000</port>
</replica>
<replica>
<host>nostromo1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>jones0</host>
<port>9000</port>
</replica>
<replica>
<host>jones1</host>
<port>9000</port>
</replica>
</shard>
</ripley>
</remote_servers>
<zookeeper>
<node index="1">
<host>zookeeper</host>
<port>2181</port>
</node>
</zookeeper>
<macros>
<cluster>ripley</cluster>
<shard>${SHARD}</shard>
<replica>${REPLICA}</replica>
</macros>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
</company>
Run cluster
$ mkdir -p nostromo0 nostromo1 jones0 jones1
$ REPLICA=nostromo0 SHARD=0 envsubst < config.xml > nostromo0/config.xml
$ REPLICA=nostromo1 SHARD=0 envsubst < config.xml > nostromo1/config.xml
$ REPLICA=jones0 SHARD=1 envsubst < config.xml > jones0/config.xml
$ REPLICA=jones1 SHARD=1 envsubst < config.xml > jones1/config.xml
$ cp users.xml nostromo0/
$ cp users.xml nostromo1/
$ cp users.xml jones0/
$ cp users.xml jones1/
$ docker-compose up -d
See how to init and run:
# some tables with partitions by month / toYYYYMM(datetime)
clickhouse.exec("""CREATE OR REPLACE TABLE default.events (
name String,
value UInt64,
datetime Datetime
)
ENGINE MergeTree() ORDER BY toYYYYMM(datetime) PARTITION BY toYYYYMM(datetime) AS (
SELECT *
FROM generateRandom('name String, value UInt64, datetime Datetime', 1, 1, 1)
LIMIT 27)
""")
clickhouse.exec("""CREATE OR REPLACE TABLE default.logs (
name String,
datetime Datetime
)
ENGINE MergeTree() ORDER BY toYYYYMM(datetime) PARTITION BY toYYYYMM(datetime) AS (
SELECT *
FROM generateRandom('name String, datetime Datetime', 1, 1, 1)
LIMIT 69)
""")
cluster_name = 'ripley'
clickhouse.set_on_cluster(cluster_name)
db_name = 'new_prod'
clickhouse.create_db(db_name)
# create replicated, distributed tables and move data by partition
for table_name in ['events', 'logs']:
clickhouse.set_on_cluster('ripley')
table = clickhouse.get_table_by_name(table_name)
# replicated tables partitioned by day / toDate(datetime)
replicated_name = f'{table_name}_replicated'
clickhouse.create_table_as(
from_table=table,
table=replicated_name,
db=db_name,
order_by=['name'],
partition_by=['toDate(datetime)'],
engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')",
)
distributed_name = f'{db_name}.{table_name}_distributed'
distributed_table = clickhouse.create_distributed_table(
create_table=distributed_name,
table=replicated_name,
database=db_name,
sharding_key='rand()',
)
clickhouse.skip_on_cluster()
maintenance_table = clickhouse.create_table_as(
from_table=table,
table=f'{table_name}_maintenance',
db=db_name,
)
for partition in clickhouse.get_table_partitions(table=table.name, db=table.database):
clickhouse.move_partition(
from_table=table,
to_table=maintenance_table,
partition=partition.partition_id,
)
clickhouse.insert_from_table(maintenance_table, distributed_table)
clickhouse.drop_partition(maintenance_table, partition.partition)
# data check
sleep(3) # just in case for data latency
for table_name in ['events', 'logs']:
replicated_name = f'{table_name}_replicated'
distributed_name = f'{db_name}.{table_name}_distributed'
result = clickhouse.exec(f"""SELECT * FROM (
SELECT hostName(), '{replicated_name}' AS name, count(*) AS records
FROM remote('172.20.2.1', '{db_name}', '{replicated_name}', 'default', '')
UNION ALL
SELECT hostName(), '{replicated_name}', count(*)
FROM remote('172.20.2.2', '{db_name}', '{replicated_name}', 'default', '')
UNION ALL
SELECT hostName(), '{replicated_name}', count(*)
FROM remote('172.20.2.3', '{db_name}', '{replicated_name}', 'default', '')
UNION ALL
SELECT hostName(), '{replicated_name}', count(*) AS name
FROM remote('172.20.2.4', '{db_name}', '{replicated_name}', 'default', '')
UNION ALL
SELECT hostName(), '{distributed_name}', count(*) AS name
FROM {distributed_name}
) ORDER BY records
""")
for hostname, table, records in result:
logging.info('hostname: %s, table: %s, records: %s', hostname, table, records)
logs
2024-11-06 16:14:27,178 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:27,178 [INFO] ripley: CREATE DATABASE IF NOT EXISTS new_prod ON CLUSTER 'ripley'
2024-11-06 16:14:27,342 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:27,350 [INFO] ripley: CREATE TABLE new_prod.events_replicated ON CLUSTER ripley
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')
ORDER BY name
PARTITION BY toDate(datetime)
AS default.events
2024-11-06 16:14:27,562 [INFO] ripley: CREATE TABLE new_prod.events_distributed ON CLUSTER ripley
ENGINE = Distributed('{cluster}', new_prod, events_replicated, rand())
2024-11-06 16:14:27,741 [INFO] ripley: ON CLUSTER mode disabled
2024-11-06 16:14:27,742 [INFO] ripley: CREATE TABLE new_prod.events_maintenance
ENGINE = MergeTree
ORDER BY toYYYYMM(datetime)
PARTITION BY toYYYYMM(datetime)
AS default.events
2024-11-06 16:14:27,795 [INFO] ripley: ALTER TABLE default.events MOVE PARTITION '197504' TO TABLE new_prod.events_maintenance
2024-11-06 16:14:27,801 [INFO] ripley: INSERT INTO new_prod.events_distributed SELECT * FROM new_prod.events_maintenance
2024-11-06 16:14:27,813 [INFO] ripley: ALTER TABLE new_prod.events_maintenance DROP PARTITION '197504'
...
2024-11-06 16:14:28,431 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:28,441 [INFO] ripley: CREATE TABLE new_prod.logs_replicated ON CLUSTER ripley
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')
ORDER BY name
PARTITION BY toDate(datetime)
AS default.logs
2024-11-06 16:14:28,776 [INFO] ripley: CREATE TABLE new_prod.logs_distributed ON CLUSTER ripley
ENGINE = Distributed('{cluster}', new_prod, logs_replicated, rand())
2024-11-06 16:14:28,932 [INFO] ripley: ON CLUSTER mode disabled
2024-11-06 16:14:28,932 [INFO] ripley: CREATE TABLE new_prod.logs_maintenance
ENGINE = MergeTree
ORDER BY toYYYYMM(datetime)
PARTITION BY toYYYYMM(datetime)
AS default.logs
2024-11-06 16:14:28,980 [INFO] ripley: ALTER TABLE default.logs MOVE PARTITION '197010' TO TABLE new_prod.logs_maintenance
2024-11-06 16:14:28,986 [INFO] ripley: INSERT INTO new_prod.logs_distributed SELECT * FROM new_prod.logs_maintenance
2024-11-06 16:14:29,018 [INFO] ripley: ALTER TABLE new_prod.logs_maintenance DROP PARTITION '197010'
...
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo0, table: events_replicated, records: 8
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo1, table: events_replicated, records: 8
2024-11-06 16:14:33,647 [INFO] root: hostname: jones0, table: events_replicated, records: 19
2024-11-06 16:14:33,647 [INFO] root: hostname: jones1, table: events_replicated, records: 19
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo0, table: new_prod.events_distributed, records: 27
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo0, table: logs_replicated, records: 32
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo1, table: logs_replicated, records: 32
2024-11-06 16:14:33,663 [INFO] root: hostname: jones0, table: logs_replicated, records: 37
2024-11-06 16:14:33,663 [INFO] root: hostname: jones1, table: logs_replicated, records: 37
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo0, table: new_prod.logs_distributed, records: 69
For more information see ClickhouseProtocol and Clickhouse Tests