diff --git a/ripley/_clickhouse/cmd_service.py b/ripley/_clickhouse/cmd_service.py index a1056e2..7bd5df5 100644 --- a/ripley/_clickhouse/cmd_service.py +++ b/ripley/_clickhouse/cmd_service.py @@ -3,7 +3,14 @@ from typing import Any, Type, List, Dict from .._log import log -from .._sql_cmd.clickhouse import AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster +from .._sql_cmd.clickhouse import ( + AlterOnClusterCmd, + CreateTableOnClusterCmd, + TruncateOnClusterCmd, + CreateDbOnCluster, + CreateDistributedTable, + DropPartitionOnClusterCmd, +) from .._sql_cmd.general import AbstractSql @@ -33,15 +40,19 @@ def get_full_table_name(self, table: str, db: str = '') -> str: def skip_on_cluster(self): self._on_cluster = '' + log.info('ON CLUSTER mode disabled') def set_on_cluster(self, name: str): self._on_cluster = name + log.info("ON CLUSTER %s mode enabled", self._on_cluster) def set_settings(self, settings: dict): self._settings = settings + log.info('query settings enabled. %s', self._settings) def skip_settings(self): self._settings = {} + log.info('query settings disabled. %s', self._settings) def exec(self, sql: str, params: dict = None, with_column_types: bool = False): return self._client.execute(sql, params=params, with_column_types=with_column_types, settings=self._settings) @@ -66,11 +77,12 @@ def run_cmd(self, model_class: Type[AbstractSql], model_params: Dict) -> Any: params = deepcopy(model_params) if issubclass( model_class, - (AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster) + (AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster, + CreateDistributedTable, DropPartitionOnClusterCmd) ): if self._on_cluster: params['on_cluster'] = self._on_cluster cmd = model_class(**params) - log.info('%s\nSETTINGS %s', cmd, self._settings) + log.info('%s', cmd) self.exec(cmd.to_sql()) diff --git a/ripley/_clickhouse/main_service.py b/ripley/_clickhouse/main_service.py index 2f2a8e4..06cb3ed 100644 --- a/ripley/_clickhouse/main_service.py +++ b/ripley/_clickhouse/main_service.py @@ -122,3 +122,7 @@ def rename_table(self, table: Table, new_name: str, db: str = '') -> None: def insert_from_remote(self, settings: RemoteSettings, table: str, db: str = '', create_table: bool = False) -> None: self._table.insert_from_remote(settings, table, db, create_table) + + def create_distributed_table(self, create_table: str, table: str, database: str, sharding_key: str = '', + cluster: str = "'{cluster}'") -> Table: + return self._table.create_distributed_table(create_table, table, database, sharding_key, cluster) diff --git a/ripley/_clickhouse/table_service.py b/ripley/_clickhouse/table_service.py index 31029cb..bf88226 100644 --- a/ripley/_clickhouse/table_service.py +++ b/ripley/_clickhouse/table_service.py @@ -11,6 +11,7 @@ InsertIntoS3Cmd, Remote, InsertFromRemote, + CreateDistributedTable, ) from .._sql_cmd.general import BaseInsertIntoTableFromTable from ..clickhouse_models.remote_settings import ClickhouseRemoteSettingsModel as RemoteSettings @@ -134,3 +135,19 @@ def insert_from_remote(self, settings: RemoteSettings, table: str, db: str = '', InsertFromRemote, model_params=dict(table_name=self._cmd.get_full_table_name(table, db), from_remote=remote), ) + + def create_distributed_table(self, create_table: str, table: str, database: str, sharding_key: str = '', + cluster: str = "'{cluster}'",) -> ClickhouseTableModel: + self._cmd.run_cmd( + CreateDistributedTable, + model_params=dict( + create_table=create_table, + table=table, + database=database, + sharding_key=sharding_key, + cluster=cluster, + ), + ) + + db_name, table_name = create_table.split('.') + return self._system.get_table_by_name(table_name, db_name) diff --git a/ripley/_protocols/clickhouse.py b/ripley/_protocols/clickhouse.py index cd581a9..4cd2821 100644 --- a/ripley/_protocols/clickhouse.py +++ b/ripley/_protocols/clickhouse.py @@ -225,3 +225,19 @@ def insert_from_remote( INSERT INTO {db}.{table} SELECT * FROM remote(...) https://clickhouse.com/docs/en/integrations/s3#s3-table-functions """ + + @abc.abstractmethod + def create_distributed_table( + self, + create_table: str, + table: str, + database: str, + sharding_key: str = '', + cluster: str = "'{cluster}'", + ) -> Table: + """ + https://clickhouse.com/docs/en/engines/table-engines/special/distributed + + CREATE TABLE {create_table} ON CLUSTER ripley + ENGINE = Distributed({cluster}, {table}, {table}, {sharding_key}) + """ diff --git a/ripley/_sql_cmd/clickhouse.py b/ripley/_sql_cmd/clickhouse.py index 188269d..f048fea 100644 --- a/ripley/_sql_cmd/clickhouse.py +++ b/ripley/_sql_cmd/clickhouse.py @@ -17,8 +17,8 @@ def __init__(self, name: str, on_cluster: str = '', engine: str = ''): def to_sql(self) -> str: cmd = super().to_sql() - on_cluster = f"ON CLUSTER '{self._on_cluster}'" if self._on_cluster else '' - engine = f"ENGINE {self._engine}" if self._engine else '' + on_cluster = f" ON CLUSTER '{self._on_cluster}'" if self._on_cluster else '' + engine = f" ENGINE {self._engine}" if self._engine else '' return f"{cmd}{on_cluster}{engine}" @@ -152,7 +152,7 @@ def __init__(self, table_name: str, on_cluster: str = ''): def to_sql(self) -> str: cmd = super().to_sql() - cmd = f"{cmd} ON CLUSTER '{self._on_cluster}'" if self._on_cluster else cmd + cmd = f"{cmd} ON CLUSTER {self._on_cluster}" if self._on_cluster else cmd return cmd @@ -173,13 +173,11 @@ def __init__( self._engine = engine def to_sql(self) -> str: - return f""" - {super().to_sql()} - engine = {self._engine} + return f"""{super().to_sql()} + ENGINE = {self._engine} {self._order_by} {self._partition_by} - AS {self._from_table} - """ + AS {self._from_table}""" class Remote(AbstractSql): @@ -235,3 +233,26 @@ def __repr__(self): def to_sql(self) -> str: return f'INSERT INTO {self._table_name} SELECT * FROM {self._from_remote.to_sql()}' + + +class CreateDistributedTable(AbstractSql): + def __init__( + self, + create_table: str, + table: str, + on_cluster: str, + database: str, + sharding_key: str = '', + cluster: str = "'{cluster}'", + ): + self._create_table = create_table + self._table = table + self._on_cluster = on_cluster + self._cluster = cluster + self._database = database + self._sharding_key = sharding_key + + def to_sql(self) -> str: + sharding_key = f', {self._sharding_key}' if self._sharding_key else '' + return f"""CREATE TABLE {self._create_table} ON CLUSTER {self._on_cluster} + ENGINE = Distributed({self._cluster}, {self._database}, {self._table}{sharding_key})"""