From 7ec7c6d7061f67407cb1881da56f142a411d9364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 27 Nov 2024 14:12:58 +0100 Subject: [PATCH] tests: added escaping table names in queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some characters that may be used in topic names must be treated specialy in SQL. Table names must be quoted when they contains the special character. Quotation mark is specific to query engine. Added quotation for table names for query engines used in tests. Signed-off-by: Michał Maślanka (cherry picked from commit 8fe08cdf800d0e6b04e4be9aad76c97a07c5bc7a) --- tests/rptest/services/spark_service.py | 3 +++ tests/rptest/services/trino_service.py | 3 +++ tests/rptest/tests/datalake/datalake_services.py | 10 +++++----- tests/rptest/tests/datalake/query_engine_base.py | 12 ++++++++---- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/tests/rptest/services/spark_service.py b/tests/rptest/services/spark_service.py index 2e23a3d6ed722..28a5c48267b4e 100644 --- a/tests/rptest/services/spark_service.py +++ b/tests/rptest/services/spark_service.py @@ -142,6 +142,9 @@ def clean_node(self, node, **_): self.stop_node(node, allow_fail=True) node.account.remove(SparkService.LOGS_DIR, allow_fail=True) + def escape_identifier(self, table: str) -> str: + return f"`{table}`" + @staticmethod def engine_name(): return QueryEngineType.SPARK diff --git a/tests/rptest/services/trino_service.py b/tests/rptest/services/trino_service.py index 144d52b1185eb..3db4b4f357b2d 100644 --- a/tests/rptest/services/trino_service.py +++ b/tests/rptest/services/trino_service.py @@ -136,6 +136,9 @@ def make_client(self): port=self.trino_port, catalog="redpanda") + def escape_identifier(self, table: str) -> str: + return f'"{table}"' + @staticmethod def dict_to_conf(d: dict[str, Optional[str | bool]]): """ diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index a5082d3f21d14..f5fcd40646592 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -150,13 +150,12 @@ def wait_for_translation_until_offset(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): offsets = dict( map( - lambda e: (e.engine_name(), - e.max_translated_offset(table_name, partition)), + lambda e: (e.engine_name( + ), e.max_translated_offset("redpanda", topic, partition)), self.query_engines)) self.redpanda.logger.debug( f"Current translated offsets: {offsets}") @@ -179,11 +178,12 @@ def wait_for_translation(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): counts = dict( - map(lambda e: (e.engine_name(), e.count_table(table_name)), + map( + lambda e: + (e.engine_name(), e.count_table("redpanda", topic)), self.query_engines)) self.redpanda.logger.debug(f"Current counts: {counts}") return all([c == msg_count for _, c in counts.items()]) diff --git a/tests/rptest/tests/datalake/query_engine_base.py b/tests/rptest/tests/datalake/query_engine_base.py index b3d701f245255..ff03c4ec44a28 100644 --- a/tests/rptest/tests/datalake/query_engine_base.py +++ b/tests/rptest/tests/datalake/query_engine_base.py @@ -42,16 +42,20 @@ def run_query(self, query): finally: client.close() + @abstractmethod + def escape_identifier(self, table: str) -> str: + raise NotImplementedError + def run_query_fetch_all(self, query): with self.run_query(query) as cursor: return cursor.fetchall() - def count_table(self, table) -> int: - query = f"select count(*) from {table}" + def count_table(self, namespace, table) -> int: + query = f"select count(*) from {namespace}.{self.escape_identifier(table)}" with self.run_query(query) as cursor: return cursor.fetchone()[0] - def max_translated_offset(self, table, partition) -> int: - query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}" + def max_translated_offset(self, namespace, table, partition) -> int: + query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}" with self.run_query(query) as cursor: return cursor.fetchone()[0]