Skip to content

Commit

Permalink
CreateDistributedTable
Browse files Browse the repository at this point in the history
  • Loading branch information
d-ganchar committed Nov 6, 2024
1 parent e72c452 commit daa35c1
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 11 deletions.
18 changes: 15 additions & 3 deletions ripley/_clickhouse/cmd_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from typing import Any, Type, List, Dict

from .._log import log
from .._sql_cmd.clickhouse import AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster
from .._sql_cmd.clickhouse import (
AlterOnClusterCmd,
CreateTableOnClusterCmd,
TruncateOnClusterCmd,
CreateDbOnCluster,
CreateDistributedTable,
DropPartitionOnClusterCmd,
)
from .._sql_cmd.general import AbstractSql


Expand Down Expand Up @@ -33,15 +40,19 @@ def get_full_table_name(self, table: str, db: str = '') -> str:

def skip_on_cluster(self):
self._on_cluster = ''
log.info('ON CLUSTER mode disabled')

def set_on_cluster(self, name: str):
self._on_cluster = name
log.info("ON CLUSTER %s mode enabled", self._on_cluster)

def set_settings(self, settings: dict):
self._settings = settings
log.info('query settings enabled. %s', self._settings)

def skip_settings(self):
self._settings = {}
log.info('query settings disabled. %s', self._settings)

def exec(self, sql: str, params: dict = None, with_column_types: bool = False):
return self._client.execute(sql, params=params, with_column_types=with_column_types, settings=self._settings)
Expand All @@ -66,11 +77,12 @@ def run_cmd(self, model_class: Type[AbstractSql], model_params: Dict) -> Any:
params = deepcopy(model_params)
if issubclass(
model_class,
(AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster)
(AlterOnClusterCmd, CreateTableOnClusterCmd, TruncateOnClusterCmd, CreateDbOnCluster,
CreateDistributedTable, DropPartitionOnClusterCmd)
):
if self._on_cluster:
params['on_cluster'] = self._on_cluster

cmd = model_class(**params)
log.info('%s\nSETTINGS %s', cmd, self._settings)
log.info('%s', cmd)
self.exec(cmd.to_sql())
4 changes: 4 additions & 0 deletions ripley/_clickhouse/main_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ def rename_table(self, table: Table, new_name: str, db: str = '') -> None:
def insert_from_remote(self, settings: RemoteSettings, table: str, db: str = '',
create_table: bool = False) -> None:
self._table.insert_from_remote(settings, table, db, create_table)

def create_distributed_table(self, create_table: str, table: str, database: str, sharding_key: str = '',
cluster: str = "'{cluster}'") -> Table:
return self._table.create_distributed_table(create_table, table, database, sharding_key, cluster)
17 changes: 17 additions & 0 deletions ripley/_clickhouse/table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InsertIntoS3Cmd,
Remote,
InsertFromRemote,
CreateDistributedTable,
)
from .._sql_cmd.general import BaseInsertIntoTableFromTable
from ..clickhouse_models.remote_settings import ClickhouseRemoteSettingsModel as RemoteSettings
Expand Down Expand Up @@ -134,3 +135,19 @@ def insert_from_remote(self, settings: RemoteSettings, table: str, db: str = '',
InsertFromRemote,
model_params=dict(table_name=self._cmd.get_full_table_name(table, db), from_remote=remote),
)

def create_distributed_table(self, create_table: str, table: str, database: str, sharding_key: str = '',
cluster: str = "'{cluster}'",) -> ClickhouseTableModel:
self._cmd.run_cmd(
CreateDistributedTable,
model_params=dict(
create_table=create_table,
table=table,
database=database,
sharding_key=sharding_key,
cluster=cluster,
),
)

db_name, table_name = create_table.split('.')
return self._system.get_table_by_name(table_name, db_name)
16 changes: 16 additions & 0 deletions ripley/_protocols/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,19 @@ def insert_from_remote(
INSERT INTO {db}.{table} SELECT * FROM remote(...)
https://clickhouse.com/docs/en/integrations/s3#s3-table-functions
"""

@abc.abstractmethod
def create_distributed_table(
self,
create_table: str,
table: str,
database: str,
sharding_key: str = '',
cluster: str = "'{cluster}'",
) -> Table:
"""
https://clickhouse.com/docs/en/engines/table-engines/special/distributed
CREATE TABLE {create_table} ON CLUSTER ripley
ENGINE = Distributed({cluster}, {table}, {table}, {sharding_key})
"""
37 changes: 29 additions & 8 deletions ripley/_sql_cmd/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def __init__(self, name: str, on_cluster: str = '', engine: str = ''):

def to_sql(self) -> str:
cmd = super().to_sql()
on_cluster = f"ON CLUSTER '{self._on_cluster}'" if self._on_cluster else ''
engine = f"ENGINE {self._engine}" if self._engine else ''
on_cluster = f" ON CLUSTER '{self._on_cluster}'" if self._on_cluster else ''
engine = f" ENGINE {self._engine}" if self._engine else ''
return f"{cmd}{on_cluster}{engine}"


Expand Down Expand Up @@ -152,7 +152,7 @@ def __init__(self, table_name: str, on_cluster: str = ''):

def to_sql(self) -> str:
cmd = super().to_sql()
cmd = f"{cmd} ON CLUSTER '{self._on_cluster}'" if self._on_cluster else cmd
cmd = f"{cmd} ON CLUSTER {self._on_cluster}" if self._on_cluster else cmd
return cmd


Expand All @@ -173,13 +173,11 @@ def __init__(
self._engine = engine

def to_sql(self) -> str:
return f"""
{super().to_sql()}
engine = {self._engine}
return f"""{super().to_sql()}
ENGINE = {self._engine}
{self._order_by}
{self._partition_by}
AS {self._from_table}
"""
AS {self._from_table}"""


class Remote(AbstractSql):
Expand Down Expand Up @@ -235,3 +233,26 @@ def __repr__(self):

def to_sql(self) -> str:
return f'INSERT INTO {self._table_name} SELECT * FROM {self._from_remote.to_sql()}'


class CreateDistributedTable(AbstractSql):
def __init__(
self,
create_table: str,
table: str,
on_cluster: str,
database: str,
sharding_key: str = '',
cluster: str = "'{cluster}'",
):
self._create_table = create_table
self._table = table
self._on_cluster = on_cluster
self._cluster = cluster
self._database = database
self._sharding_key = sharding_key

def to_sql(self) -> str:
sharding_key = f', {self._sharding_key}' if self._sharding_key else ''
return f"""CREATE TABLE {self._create_table} ON CLUSTER {self._on_cluster}
ENGINE = Distributed({self._cluster}, {self._database}, {self._table}{sharding_key})"""

0 comments on commit daa35c1

Please sign in to comment.