diff --git a/tests/rptest/services/spark_service.py b/tests/rptest/services/spark_service.py index 2e23a3d6ed72..28a5c48267b4 100644 --- a/tests/rptest/services/spark_service.py +++ b/tests/rptest/services/spark_service.py @@ -142,6 +142,9 @@ def clean_node(self, node, **_): self.stop_node(node, allow_fail=True) node.account.remove(SparkService.LOGS_DIR, allow_fail=True) + def escape_identifier(self, table: str) -> str: + return f"`{table}`" + @staticmethod def engine_name(): return QueryEngineType.SPARK diff --git a/tests/rptest/services/trino_service.py b/tests/rptest/services/trino_service.py index 144d52b1185e..3db4b4f357b2 100644 --- a/tests/rptest/services/trino_service.py +++ b/tests/rptest/services/trino_service.py @@ -136,6 +136,9 @@ def make_client(self): port=self.trino_port, catalog="redpanda") + def escape_identifier(self, table: str) -> str: + return f'"{table}"' + @staticmethod def dict_to_conf(d: dict[str, Optional[str | bool]]): """ diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index a5082d3f21d1..f5fcd4064659 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -150,13 +150,12 @@ def wait_for_translation_until_offset(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): offsets = dict( map( - lambda e: (e.engine_name(), - e.max_translated_offset(table_name, partition)), + lambda e: (e.engine_name( + ), e.max_translated_offset("redpanda", topic, partition)), self.query_engines)) self.redpanda.logger.debug( f"Current translated offsets: {offsets}") @@ -179,11 +178,12 @@ def wait_for_translation(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): counts = dict( - map(lambda e: (e.engine_name(), e.count_table(table_name)), + map( + lambda e: + (e.engine_name(), e.count_table("redpanda", topic)), self.query_engines)) self.redpanda.logger.debug(f"Current counts: {counts}") return all([c == msg_count for _, c in counts.items()]) diff --git a/tests/rptest/tests/datalake/query_engine_base.py b/tests/rptest/tests/datalake/query_engine_base.py index b3d701f24525..ff03c4ec44a2 100644 --- a/tests/rptest/tests/datalake/query_engine_base.py +++ b/tests/rptest/tests/datalake/query_engine_base.py @@ -42,16 +42,20 @@ def run_query(self, query): finally: client.close() + @abstractmethod + def escape_identifier(self, table: str) -> str: + raise NotImplementedError + def run_query_fetch_all(self, query): with self.run_query(query) as cursor: return cursor.fetchall() - def count_table(self, table) -> int: - query = f"select count(*) from {table}" + def count_table(self, namespace, table) -> int: + query = f"select count(*) from {namespace}.{self.escape_identifier(table)}" with self.run_query(query) as cursor: return cursor.fetchone()[0] - def max_translated_offset(self, table, partition) -> int: - query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}" + def max_translated_offset(self, namespace, table, partition) -> int: + query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}" with self.run_query(query) as cursor: return cursor.fetchone()[0]