Skip to content

Commit

Permalink
tests: added escaping table names in queries
Browse files Browse the repository at this point in the history
Some characters that may be used in topic names must be treated specialy
in SQL. Table names must be quoted when they contains the special
character. Quotation mark is specific to query engine. Added quotation
for table names for query engines used in tests.

Signed-off-by: Michał Maślanka <[email protected]>
(cherry picked from commit 8fe08cd)
  • Loading branch information
mmaslankaprv authored and andrwng committed Dec 24, 2024
1 parent 214cd3a commit 7ec7c6d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
3 changes: 3 additions & 0 deletions tests/rptest/services/spark_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def clean_node(self, node, **_):
self.stop_node(node, allow_fail=True)
node.account.remove(SparkService.LOGS_DIR, allow_fail=True)

def escape_identifier(self, table: str) -> str:
return f"`{table}`"

@staticmethod
def engine_name():
return QueryEngineType.SPARK
Expand Down
3 changes: 3 additions & 0 deletions tests/rptest/services/trino_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def make_client(self):
port=self.trino_port,
catalog="redpanda")

def escape_identifier(self, table: str) -> str:
return f'"{table}"'

@staticmethod
def dict_to_conf(d: dict[str, Optional[str | bool]]):
"""
Expand Down
10 changes: 5 additions & 5 deletions tests/rptest/tests/datalake/datalake_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ def wait_for_translation_until_offset(self,
timeout=30,
backoff_sec=5):
self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec)
table_name = f"redpanda.{topic}"

def translation_done():
offsets = dict(
map(
lambda e: (e.engine_name(),
e.max_translated_offset(table_name, partition)),
lambda e: (e.engine_name(
), e.max_translated_offset("redpanda", topic, partition)),
self.query_engines))
self.redpanda.logger.debug(
f"Current translated offsets: {offsets}")
Expand All @@ -179,11 +178,12 @@ def wait_for_translation(self,
timeout=30,
backoff_sec=5):
self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec)
table_name = f"redpanda.{topic}"

def translation_done():
counts = dict(
map(lambda e: (e.engine_name(), e.count_table(table_name)),
map(
lambda e:
(e.engine_name(), e.count_table("redpanda", topic)),
self.query_engines))
self.redpanda.logger.debug(f"Current counts: {counts}")
return all([c == msg_count for _, c in counts.items()])
Expand Down
12 changes: 8 additions & 4 deletions tests/rptest/tests/datalake/query_engine_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,20 @@ def run_query(self, query):
finally:
client.close()

@abstractmethod
def escape_identifier(self, table: str) -> str:
raise NotImplementedError

def run_query_fetch_all(self, query):
with self.run_query(query) as cursor:
return cursor.fetchall()

def count_table(self, table) -> int:
query = f"select count(*) from {table}"
def count_table(self, namespace, table) -> int:
query = f"select count(*) from {namespace}.{self.escape_identifier(table)}"
with self.run_query(query) as cursor:
return cursor.fetchone()[0]

def max_translated_offset(self, table, partition) -> int:
query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}"
def max_translated_offset(self, namespace, table, partition) -> int:
query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}"
with self.run_query(query) as cursor:
return cursor.fetchone()[0]

0 comments on commit 7ec7c6d

Please sign in to comment.