Skip to content

Commit

Permalink
clickhouse: Check engine availability before test
Browse files Browse the repository at this point in the history
Astacus can be tested against different builds and versions
of Clickhouse.
This commit skips tests with table engines which are likely to be
disabled while supported by Astacus.
  • Loading branch information
Khatskevich committed Mar 21, 2024
1 parent e6ca1a1 commit 391ba1c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
13 changes: 13 additions & 0 deletions astacus/coordinator/plugins/clickhouse/engines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

import enum


# This enum contains only used constants
class TableEngine(enum.Enum):
MySQL = "MySQL"
PostgreSQL = "PostgreSQL"
S3 = "S3"
55 changes: 38 additions & 17 deletions tests/integration/coordinator/plugins/clickhouse/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
from _pytest.fixtures import SubRequest
from astacus.common.ipc import RestoreRequest
from astacus.coordinator.plugins.base import OperationContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.client import (
ClickHouseClient,
ClickHouseClientQueryError,
escape_sql_string,
HttpClickHouseClient,
)
from astacus.coordinator.plugins.clickhouse.engines import TableEngine
from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin
from collections.abc import AsyncIterable, AsyncIterator, Sequence
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -52,6 +58,11 @@ def get_restore_steps_names() -> Sequence[str]:
return [step.__class__.__name__ for step in steps]


async def is_engine_available(client: ClickHouseClient, engine: TableEngine) -> bool:
query = f"SELECT TRUE FROM system.table_engines WHERE name = {escape_sql_string(engine.value.encode())}".encode()
return len(await client.execute(query)) == 1


@asynccontextmanager
async def restorable_cluster_manager(
ports: Ports,
Expand Down Expand Up @@ -220,14 +231,18 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
b"SETTINGS flatten_nested=1"
)
# integrations - note most of these never actually attempt to connect to the remote server.
await clients[0].execute(
b"CREATE TABLE default.postgresql (a Int) "
b"ENGINE = PostgreSQL('https://host:1234', 'database', 'table', 'user', 'password')"
)
await clients[0].execute(
b"CREATE TABLE default.mysql (a Int) ENGINE = MySQL('https://host:1234', 'database', 'table', 'user', 'password')"
)
await clients[0].execute(b"CREATE TABLE default.s3 (a Int) ENGINE = S3('http://bucket.s3.amazonaws.com/key.json')")
if await is_engine_available(clients[0], TableEngine.PostgreSQL):
await clients[0].execute(
b"CREATE TABLE default.postgresql (a Int) "
b"ENGINE = PostgreSQL('https://host:1234', 'database', 'table', 'user', 'password')"
)
if await is_engine_available(clients[0], TableEngine.MySQL):
await clients[0].execute(
b"CREATE TABLE default.mysql (a Int) "
b"ENGINE = MySQL('https://host:1234', 'database', 'table', 'user', 'password')"
)
if await is_engine_available(clients[0], TableEngine.S3):
await clients[0].execute(b"CREATE TABLE default.s3 (a Int) ENGINE = S3('http://bucket.s3.amazonaws.com/key.json')")
# add a function table
await clients[0].execute(b"CREATE TABLE default.from_function_table AS numbers(3)")
# add a table with data in object storage
Expand Down Expand Up @@ -500,12 +515,18 @@ async def test_cleanup_does_not_break_object_storage_disk_files(
await check_object_storage_data(clients)


async def test_restores_integration_tables(restored_cluster: Sequence[ClickHouseClient]) -> None:
@pytest.mark.parametrize(
"table_name,table_engine",
[
("default.postgresql", TableEngine.PostgreSQL),
("default.mysql", TableEngine.MySQL),
("default.s3", TableEngine.S3),
],
)
async def test_restores_integration_tables(
restored_cluster: Sequence[ClickHouseClient], table_name: str, table_engine: TableEngine
) -> None:
for client in restored_cluster:
assert await table_exists(client, "default.postgresql")
assert await table_exists(client, "default.mysql")
assert await table_exists(client, "default.s3")


async def table_exists(client: ClickHouseClient, table_name: str) -> bool:
return bool(await client.execute(f"EXISTS TABLE {table_name}".encode()))
if not await is_engine_available(client, table_engine):
pytest.skip(f"Table engine {table_engine.value} not available")
assert bool(await client.execute(f"EXISTS TABLE {table_name}".encode()))

0 comments on commit 391ba1c

Please sign in to comment.