Skip to content

Commit

Permalink
Merge pull request #232 from Aiven-Open/joelynch/clickhouse-24.3
Browse files Browse the repository at this point in the history
clickhouse: flatten_nested no longer works for Array(Tuple) in 24.3
  • Loading branch information
dmitry-potepalov authored Oct 10, 2024
2 parents a6ab248 + 50a2155 commit e816c83
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
42 changes: 29 additions & 13 deletions tests/integration/coordinator/plugins/clickhouse/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
RestorableSource,
run_astacus_command,
)
from tests.utils import get_clickhouse_version
from typing import Final
from unittest import mock

Expand Down Expand Up @@ -87,7 +88,11 @@ async def restorable_cluster_manager(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket
) as astacus_cluster:
clients = [get_clickhouse_client(service) for service in clickhouse_cluster.services]
await setup_cluster_content(clients, clickhouse_cluster.use_named_collections)
await setup_cluster_content(
clients,
clickhouse_cluster.use_named_collections,
get_clickhouse_version(clickhouse_command),
)
await setup_cluster_users(clients)
run_astacus_command(astacus_cluster, "backup")
# We have destroyed everything except the backup storage dir
Expand Down Expand Up @@ -183,7 +188,9 @@ async def sync_replicated_table(clients: Sequence[ClickHouseClient], table_name:
await client.execute(f"SYSTEM SYNC REPLICA default.{escape_sql_identifier(table_name.encode())} STRICT".encode())


async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_named_collections: bool) -> None:
async def setup_cluster_content(
clients: Sequence[HttpClickHouseClient], use_named_collections: bool, clickhouse_version: tuple[int, ...]
) -> None:
for client in clients:
await client.execute(b"DROP DATABASE default SYNC")
await client.execute(
Expand Down Expand Up @@ -232,12 +239,13 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=0"
)
await clients[0].execute(
b"CREATE TABLE default.array_tuple_flatten (thekey UInt32, thedata Array(Tuple(a UInt32, b UInt32))) "
b"ENGINE = ReplicatedMergeTree ORDER BY (thekey) "
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=1"
)
if clickhouse_version < (24, 3):
await clients[0].execute(
b"CREATE TABLE default.array_tuple_flatten (thekey UInt32, thedata Array(Tuple(a UInt32, b UInt32))) "
b"ENGINE = ReplicatedMergeTree ORDER BY (thekey) "
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=1"
)
# integrations - note most of these never actually attempt to connect to the remote server.
if await is_engine_available(clients[0], TableEngine.PostgreSQL):
await clients[0].execute(
Expand Down Expand Up @@ -307,7 +315,8 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
await clients[0].execute(b"INSERT INTO default.nested_not_flatten VALUES (123, [(4, 5)])")
await clients[0].execute(b"INSERT INTO default.nested_flatten VALUES (123, [4], [5])")
await clients[0].execute(b"INSERT INTO default.array_tuple_not_flatten VALUES (123, [(4, 5)])")
await clients[0].execute(b"INSERT INTO default.array_tuple_flatten VALUES (123, [4], [5])")
if clickhouse_version < (24, 3):
await clients[0].execute(b"INSERT INTO default.array_tuple_flatten VALUES (123, [4], [5])")
# And some object storage data
await clients[0].execute(b"INSERT INTO default.in_object_storage VALUES (123, 'foo')")
await clients[1].execute(b"INSERT INTO default.in_object_storage VALUES (456, 'bar')")
Expand Down Expand Up @@ -409,16 +418,21 @@ async def test_restores_table_with_nullable_key(restored_cluster: Sequence[Click
assert response == []


async def test_restores_table_with_nested_fields(restored_cluster: Sequence[ClickHouseClient]) -> None:
async def test_restores_table_with_nested_fields(
restored_cluster: Sequence[ClickHouseClient], clickhouse_command: ClickHouseCommand
) -> None:
client = restored_cluster[0]
response = await client.execute(b"SELECT thekey, thedata FROM default.nested_not_flatten ORDER BY thekey")
assert response == [[123, [{"a": 4, "b": 5}]]]
response = await client.execute(b"SELECT thekey, thedata.a, thedata.b FROM default.nested_flatten ORDER BY thekey")
assert response == [[123, [4], [5]]]
response = await client.execute(b"SELECT thekey, thedata FROM default.array_tuple_not_flatten ORDER BY thekey")
assert response == [[123, [{"a": 4, "b": 5}]]]
response = await client.execute(b"SELECT thekey, thedata.a, thedata.b FROM default.array_tuple_flatten ORDER BY thekey")
assert response == [[123, [4], [5]]]
if get_clickhouse_version(clickhouse_command) < (24, 3):
response = await client.execute(
b"SELECT thekey, thedata.a, thedata.b FROM default.array_tuple_flatten ORDER BY thekey"
)
assert response == [[123, [4], [5]]]


async def test_restores_function_table(restored_cluster: Sequence[ClickHouseClient]) -> None:
Expand Down Expand Up @@ -545,7 +559,9 @@ async def test_cleanup_does_not_break_object_storage_disk_files(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket
) as astacus_cluster:
clients = [get_clickhouse_client(service) for service in clickhouse_cluster.services]
await setup_cluster_content(clients, clickhouse_cluster.use_named_collections)
await setup_cluster_content(
clients, clickhouse_cluster.use_named_collections, get_clickhouse_version(clickhouse_command)
)
await setup_cluster_users(clients)
run_astacus_command(astacus_cluster, "backup")
run_astacus_command(astacus_cluster, "backup")
Expand Down
8 changes: 7 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from astacus.common.rohmustorage import RohmuCompressionType, RohmuConfig
from collections.abc import Sequence
from functools import cache
from pathlib import Path
from typing import Final

Expand Down Expand Up @@ -86,11 +87,16 @@ def parse_clickhouse_version(command_output: bytes) -> tuple[int, ...]:
return version_tuple


def get_clickhouse_version(command: Sequence[str | Path]) -> tuple[int, ...]:
@cache
def _get_clickhouse_version(command: tuple[str | Path, ...]) -> tuple[int, ...]:
version_command_output = subprocess.check_output([*command, "--version"])
return parse_clickhouse_version(version_command_output)


def get_clickhouse_version(command: Sequence[str | Path]) -> tuple[int, ...]:
return _get_clickhouse_version(tuple(command))


def is_cassandra_driver_importable() -> bool:
return importlib.util.find_spec("cassandra") is not None

Expand Down

0 comments on commit e816c83

Please sign in to comment.