Skip to content

Clickhouse resharding example

Danila Ganchar edited this page Nov 6, 2024 · 1 revision

Let's say we have large tables and we have prepared a cluster(2 shards + 2 replicas). We need to migrate data.

docker-compose.yml
version: '3.5'
services:
  zookeeper:
    image: zookeeper:3.7
    container_name: zookeeper
    hostname: zookeeper
    networks:
      clickhouse-network:
        ipv4_address: 172.20.2.0
  clickhouse01:
    image: clickhouse/clickhouse-server:23.4.2.11-alpine
    container_name: nostromo0
    hostname: nostromo0
    networks:
      clickhouse-network:
        ipv4_address:  172.20.2.1
    ports:
      - "127.0.0.1:8123:8123"
      - "127.0.0.1:9000:9000"
    volumes:
      - ${PWD}/nostromo0:/etc/clickhouse-server
    depends_on:
      - zookeeper
  clickhouse02:
    image: clickhouse/clickhouse-server:23.4.2.11-alpine
    container_name: nostromo1
    hostname: nostromo1
    networks:
      clickhouse-network:
        ipv4_address:  172.20.2.2
    volumes:
      - ${PWD}/nostromo1:/etc/clickhouse-server
    depends_on:
      - zookeeper
  clickhouse03:
    image: clickhouse/clickhouse-server:23.4.2.11-alpine
    container_name: jones0
    hostname: jones0
    networks:
      clickhouse-network:
        ipv4_address:  172.20.2.3
    volumes:
      - ${PWD}/jones0:/etc/clickhouse-server
    depends_on:
      - zookeeper
  clickhouse04:
    image: clickhouse/clickhouse-server:23.4.2.11-alpine
    container_name: jones1
    hostname: jones1
    networks:
      clickhouse-network:
        ipv4_address:  172.20.2.4
    volumes:
      - ${PWD}/jones1:/etc/clickhouse-server
    depends_on:
      - zookeeper

networks:
  clickhouse-network:
    name: clickhouse-network
    ipam:
      config:
        - subnet: 172.20.0.0/16
users.xml
<?xml version="1.0"?>
<company>
    <profiles>
        <default>
            <max_memory_usage>10000000000</max_memory_usage>
            <use_uncompressed_cache>0</use_uncompressed_cache>
            <load_balancing>in_order</load_balancing>
            <log_queries>1</log_queries>
        </default>
    </profiles>

    <users>
        <default>
            <password></password>
            <profile>default</profile>
            <networks>
                <ip>::/0</ip>
            </networks>
            <quota>default</quota>
        </default>
        <admin>
            <password>123</password>
            <profile>default</profile>
            <networks>
                <ip>::/0</ip>
            </networks>
            <quota>default</quota>
        </admin>
    </users>

    <quotas>
        <default>
            <interval>
                <duration>3600</duration>
                <queries>0</queries>
                <errors>0</errors>
                <result_rows>0</result_rows>
                <read_rows>0</read_rows>
                <execution_time>0</execution_time>
            </interval>
        </default>
    </quotas>
</company>
config.xml
<?xml version="1.0"?>
<company>
    <logger>
        <level>debug</level>
        <console>true</console>
        <log remove="remove"/>
        <errorlog remove="remove"/>
    </logger>

    <query_log>
        <database>system</database>
        <table>query_log</table>
    </query_log>

    <listen_host>0.0.0.0</listen_host>
    <http_port>8123</http_port>
    <tcp_port>9000</tcp_port>
    <interserver_http_host>${REPLICA}</interserver_http_host>
    <interserver_http_port>9009</interserver_http_port>

    <max_connections>4096</max_connections>
    <keep_alive_timeout>3</keep_alive_timeout>
    <max_concurrent_queries>100</max_concurrent_queries>
    <uncompressed_cache_size>8589934592</uncompressed_cache_size>
    <mark_cache_size>5368709120</mark_cache_size>

    <path>/var/lib/clickhouse/</path>
    <tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
    <user_files_path>/var/lib/clickhouse/user_files/</user_files_path>

    <users_config>users.xml</users_config>
    <default_profile>default</default_profile>
    <default_database>default</default_database>
    <timezone>Europe/Warsaw</timezone>
    <mlock_executable>false</mlock_executable>

    <remote_servers>
        <ripley>
            <shard>
                <replica>
                    <host>nostromo0</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>nostromo1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>jones0</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>jones1</host>
                    <port>9000</port>
                </replica>
            </shard>
        </ripley>
    </remote_servers>

    <zookeeper>
        <node index="1">
            <host>zookeeper</host>
            <port>2181</port>
        </node>
    </zookeeper>

    <macros>
        <cluster>ripley</cluster>
        <shard>${SHARD}</shard>
        <replica>${REPLICA}</replica>
    </macros>

    <distributed_ddl>
        <path>/clickhouse/task_queue/ddl</path>
    </distributed_ddl>

    <format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
</company>
Run cluster
$ mkdir -p nostromo0 nostromo1 jones0 jones1
$ REPLICA=nostromo0 SHARD=0 envsubst < config.xml > nostromo0/config.xml
$ REPLICA=nostromo1 SHARD=0 envsubst < config.xml > nostromo1/config.xml
$ REPLICA=jones0 SHARD=1 envsubst < config.xml > jones0/config.xml
$ REPLICA=jones1 SHARD=1 envsubst < config.xml > jones1/config.xml
$ cp users.xml nostromo0/
$ cp users.xml nostromo1/
$ cp users.xml jones0/
$ cp users.xml jones1/
$ docker-compose up -d

See how to init and run:

# some tables with partitions by month / toYYYYMM(datetime)
clickhouse.exec("""CREATE OR REPLACE TABLE default.events (
      name String,
      value UInt64,
      datetime Datetime
)
ENGINE MergeTree() ORDER BY toYYYYMM(datetime) PARTITION BY toYYYYMM(datetime) AS (
    SELECT *
      FROM generateRandom('name String, value UInt64, datetime Datetime', 1, 1, 1)
 LIMIT 27)
""")

clickhouse.exec("""CREATE OR REPLACE TABLE default.logs (
      name String,
      datetime Datetime
)
ENGINE MergeTree() ORDER BY toYYYYMM(datetime) PARTITION BY toYYYYMM(datetime) AS (
    SELECT *
      FROM generateRandom('name String, datetime Datetime', 1, 1, 1)
 LIMIT 69)
""")


cluster_name = 'ripley'
clickhouse.set_on_cluster(cluster_name)
db_name = 'new_prod'
clickhouse.create_db(db_name)

# create replicated, distributed tables and move data by partition
for table_name in ['events', 'logs']:
    clickhouse.set_on_cluster('ripley')
    table = clickhouse.get_table_by_name(table_name)

    # replicated tables partitioned by day / toDate(datetime)
    replicated_name = f'{table_name}_replicated'
    clickhouse.create_table_as(
        from_table=table,
        table=replicated_name,
        db=db_name,
        order_by=['name'],
        partition_by=['toDate(datetime)'],
        engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')",
    )

    distributed_name = f'{db_name}.{table_name}_distributed'
    distributed_table = clickhouse.create_distributed_table(
        create_table=distributed_name,
        table=replicated_name,
        database=db_name,
        sharding_key='rand()',
    )

    clickhouse.skip_on_cluster()
    maintenance_table = clickhouse.create_table_as(
        from_table=table,
        table=f'{table_name}_maintenance',
        db=db_name,
    )

    for partition in clickhouse.get_table_partitions(table=table.name, db=table.database):
        clickhouse.move_partition(
            from_table=table,
            to_table=maintenance_table,
            partition=partition.partition_id,
        )

        clickhouse.insert_from_table(maintenance_table, distributed_table)
        clickhouse.drop_partition(maintenance_table, partition.partition)

# data check
sleep(3)  # just in case for data latency
for table_name in ['events', 'logs']:
    replicated_name = f'{table_name}_replicated'
    distributed_name = f'{db_name}.{table_name}_distributed'
    result = clickhouse.exec(f"""SELECT * FROM (
        SELECT hostName(), '{replicated_name}' AS name, count(*) AS records
          FROM remote('172.20.2.1', '{db_name}', '{replicated_name}', 'default', '')
         UNION ALL
        SELECT hostName(), '{replicated_name}', count(*)
          FROM remote('172.20.2.2', '{db_name}', '{replicated_name}', 'default', '')
         UNION ALL
        SELECT hostName(), '{replicated_name}', count(*)
          FROM remote('172.20.2.3', '{db_name}', '{replicated_name}', 'default', '')
         UNION ALL
        SELECT hostName(), '{replicated_name}', count(*) AS name
          FROM remote('172.20.2.4', '{db_name}', '{replicated_name}', 'default', '')
         UNION ALL
        SELECT hostName(), '{distributed_name}', count(*) AS name
          FROM {distributed_name}
        ) ORDER BY records
    """)

    for hostname, table, records in result:
        logging.info('hostname: %s, table: %s, records: %s', hostname, table, records)
logs
2024-11-06 16:14:27,178 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:27,178 [INFO] ripley: CREATE DATABASE IF NOT EXISTS new_prod ON CLUSTER 'ripley'
2024-11-06 16:14:27,342 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:27,350 [INFO] ripley: CREATE TABLE new_prod.events_replicated ON CLUSTER ripley
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')
ORDER BY name
PARTITION BY toDate(datetime)
AS default.events
2024-11-06 16:14:27,562 [INFO] ripley: CREATE TABLE new_prod.events_distributed ON CLUSTER ripley
ENGINE = Distributed('{cluster}', new_prod, events_replicated, rand())
2024-11-06 16:14:27,741 [INFO] ripley: ON CLUSTER mode disabled
2024-11-06 16:14:27,742 [INFO] ripley: CREATE TABLE new_prod.events_maintenance
ENGINE = MergeTree
ORDER BY toYYYYMM(datetime)
PARTITION BY toYYYYMM(datetime)
AS default.events
2024-11-06 16:14:27,795 [INFO] ripley: ALTER TABLE default.events MOVE PARTITION '197504' TO TABLE new_prod.events_maintenance
2024-11-06 16:14:27,801 [INFO] ripley: INSERT INTO new_prod.events_distributed SELECT * FROM new_prod.events_maintenance
2024-11-06 16:14:27,813 [INFO] ripley: ALTER TABLE new_prod.events_maintenance DROP PARTITION '197504'
...
2024-11-06 16:14:28,431 [INFO] ripley: ON CLUSTER ripley mode enabled
2024-11-06 16:14:28,441 [INFO] ripley: CREATE TABLE new_prod.logs_replicated ON CLUSTER ripley
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/{table}', '{replica}')
ORDER BY name
PARTITION BY toDate(datetime)
AS default.logs
2024-11-06 16:14:28,776 [INFO] ripley: CREATE TABLE new_prod.logs_distributed ON CLUSTER ripley
ENGINE = Distributed('{cluster}', new_prod, logs_replicated, rand())
2024-11-06 16:14:28,932 [INFO] ripley: ON CLUSTER mode disabled
2024-11-06 16:14:28,932 [INFO] ripley: CREATE TABLE new_prod.logs_maintenance
ENGINE = MergeTree
ORDER BY toYYYYMM(datetime)
PARTITION BY toYYYYMM(datetime)
AS default.logs
2024-11-06 16:14:28,980 [INFO] ripley: ALTER TABLE default.logs MOVE PARTITION '197010' TO TABLE new_prod.logs_maintenance
2024-11-06 16:14:28,986 [INFO] ripley: INSERT INTO new_prod.logs_distributed SELECT * FROM new_prod.logs_maintenance
2024-11-06 16:14:29,018 [INFO] ripley: ALTER TABLE new_prod.logs_maintenance DROP PARTITION '197010'
...
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo0, table: events_replicated, records: 8
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo1, table: events_replicated, records: 8
2024-11-06 16:14:33,647 [INFO] root: hostname: jones0, table: events_replicated, records: 19
2024-11-06 16:14:33,647 [INFO] root: hostname: jones1, table: events_replicated, records: 19
2024-11-06 16:14:33,647 [INFO] root: hostname: nostromo0, table: new_prod.events_distributed, records: 27
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo0, table: logs_replicated, records: 32
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo1, table: logs_replicated, records: 32
2024-11-06 16:14:33,663 [INFO] root: hostname: jones0, table: logs_replicated, records: 37
2024-11-06 16:14:33,663 [INFO] root: hostname: jones1, table: logs_replicated, records: 37
2024-11-06 16:14:33,663 [INFO] root: hostname: nostromo0, table: new_prod.logs_distributed, records: 69
Clone this wiki locally