From 587ecf463dd12abe23a29ff8c1c400ae15b1bd52 Mon Sep 17 00:00:00 2001 From: Aliaksei Khatskevich Date: Thu, 21 Mar 2024 01:07:48 +0100 Subject: [PATCH] clickhouse: Fix insertion race condition in test This commit makes replicas in Clickhouse tests synchronize before checking if expected data is present. It eliminates situation when one replica did not pass a fresh data to another one. [DDB-906] --- .../coordinator/plugins/clickhouse/test_plugin.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py index 07d7ac5b..f7964dcf 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py @@ -9,6 +9,7 @@ from astacus.coordinator.plugins.clickhouse.client import ( ClickHouseClient, ClickHouseClientQueryError, + escape_sql_identifier, escape_sql_string, HttpClickHouseClient, ) @@ -158,6 +159,7 @@ async def fixture_restored_cluster( async with restored_cluster_manager( restorable_cluster, ports, stop_after_step, clickhouse_restore_command, minio_bucket ) as clients: + await sync_replicated_tables(clients) yield clients @@ -175,6 +177,18 @@ async def fixture_function_restored_cluster( yield clients +async def sync_replicated_tables(clients: Sequence[ClickHouseClient]) -> None: + # Get replicated tables to sync + rows = await clients[0].execute( + b"SELECT name FROM system.tables WHERE database = 'default' AND engine like 'Replicated%'" + ) + for client in clients: + for row in rows: + table_name = row[0] + assert isinstance(table_name, str) + await client.execute(f"SYSTEM SYNC REPLICA default.{escape_sql_identifier(table_name.encode())} STRICT".encode()) + + async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_named_collections: bool) -> None: for client in clients: await client.execute(b"DROP DATABASE default SYNC")