diff --git a/tests/rptest/services/spark_service.py b/tests/rptest/services/spark_service.py index 2e23a3d6ed72..28a5c48267b4 100644 --- a/tests/rptest/services/spark_service.py +++ b/tests/rptest/services/spark_service.py @@ -142,6 +142,9 @@ def clean_node(self, node, **_): self.stop_node(node, allow_fail=True) node.account.remove(SparkService.LOGS_DIR, allow_fail=True) + def escape_identifier(self, table: str) -> str: + return f"`{table}`" + @staticmethod def engine_name(): return QueryEngineType.SPARK diff --git a/tests/rptest/services/trino_service.py b/tests/rptest/services/trino_service.py index 144d52b1185e..3db4b4f357b2 100644 --- a/tests/rptest/services/trino_service.py +++ b/tests/rptest/services/trino_service.py @@ -136,6 +136,9 @@ def make_client(self): port=self.trino_port, catalog="redpanda") + def escape_identifier(self, table: str) -> str: + return f'"{table}"' + @staticmethod def dict_to_conf(d: dict[str, Optional[str | bool]]): """ diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index 09a09b234fc7..618018174012 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -150,13 +150,12 @@ def wait_for_translation_until_offset(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): offsets = dict( map( - lambda e: (e.engine_name(), - e.max_translated_offset(table_name, partition)), + lambda e: (e.engine_name( + ), e.max_translated_offset("redpanda", topic, partition)), self.query_engines)) self.redpanda.logger.debug( f"Current translated offsets: {offsets}") @@ -177,11 +176,12 @@ def wait_for_translation(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): counts = dict( - map(lambda e: (e.engine_name(), e.count_table(table_name)), + map( + lambda e: + (e.engine_name(), e.count_table("redpanda", topic)), self.query_engines)) self.redpanda.logger.debug(f"Current counts: {counts}") return all([c == msg_count for _, c in counts.items()]) diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py new file mode 100644 index 000000000000..00f1c13fadf2 --- /dev/null +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -0,0 +1,241 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +import random +import threading +import time +from rptest.clients.rpk import RpkTool +from rptest.services.redpanda import RedpandaService +from rptest.tests.datalake.query_engine_base import QueryEngineBase +from rptest.util import wait_until + +from confluent_kafka import Consumer + + +class DatalakeVerifier(): + """ + Verifier that does the verification of the data in the redpanda Iceberg table. + The verifier consumes offsets from specified topic and verifies it the data + in the iceberg table matches. + + The verifier runs two threads: + - one of them consumes messages from the specified topic and buffers them in memory. + The semaphore is used to limit the number of messages buffered in memory. + + - second thread executes a per partition query that fetches the messages + from the iceberg table + """ + + #TODO: add an ability to pass lambda to verify the message content + #TODO: add tolerance for compacted topics + def __init__(self, redpanda: RedpandaService, topic: str, + query_engine: QueryEngineBase): + self.redpanda = redpanda + self.topic = topic + self.logger = redpanda.logger + self._consumed_messages = defaultdict(list) + + self._query: QueryEngineBase = query_engine + self._cg = f"verifier-group-{random.randint(0, 1000000)}" + self._lock = threading.Lock() + self._stop = threading.Event() + # number of messages buffered in memory + self._msg_semaphore = threading.Semaphore(5000) + self._total_msgs_cnt = 0 + self._executor = ThreadPoolExecutor(max_workers=2) + self._rpk = RpkTool(self.redpanda) + # errors found during verification + self._errors = [] + # map of last queried offset for each partition + self._max_queried_offsets = {} + self._last_checkpoint = {} + + def create_consumer(self): + c = Consumer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'group.id': self._cg, + 'auto.offset.reset': 'earliest' + }) + c.subscribe([self.topic]) + + return c + + def _consumer_thread(self): + self.logger.info("Starting consumer thread") + consumer = self.create_consumer() + while not self._stop.is_set(): + + self._msg_semaphore.acquire() + if self._stop.is_set(): + break + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + self.logger.error(f"Consumer error: {msg.error()}") + continue + + with self._lock: + self._total_msgs_cnt += 1 + self.logger.debug( + f"Consumed message partition: {msg.partition()} at offset {msg.offset()}" + ) + self._consumed_messages[msg.partition()].append(msg) + if len(self._errors) > 0: + return + + consumer.close() + + def _get_query(self, partition, last_queried_offset, max_consumed_offset): + return f"\ + SELECT redpanda.offset FROM redpanda.{self._query.escape_identifier(self.topic)} \ + WHERE redpanda.partition={partition} \ + AND redpanda.offset>{last_queried_offset} \ + AND redpanda.offset<={max_consumed_offset} \ + ORDER BY redpanda.offset" + + def _verify_next_message(self, partition, iceberg_offset): + if partition not in self._consumed_messages: + self._errors.append( + f"Partition {partition} returned from Iceberg query not found in consumed messages" + ) + + p_messages = self._consumed_messages[partition] + + if len(p_messages) == 0: + return + + consumer_offset = self._consumed_messages[partition][0].offset() + if iceberg_offset > consumer_offset: + self._errors.append( + f"Offset from Iceberg table {iceberg_offset} is greater than next consumed offset {consumer_offset} for partition {partition}, most likely there is a gap in the table" + ) + return + + if iceberg_offset <= self._max_queried_offsets.get(partition, -1): + self._errors.append( + f"Duplicate entry detected at offset {iceberg_offset} for partition {partition} " + ) + return + + self._max_queried_offsets[partition] = iceberg_offset + + if consumer_offset != iceberg_offset: + self._errors.append( + f"Offset from iceberg table {iceberg_offset} for {partition} does not match the next consumed offset {consumer_offset}" + ) + return + self._consumed_messages[partition].pop(0) + self._msg_semaphore.release() + + def _get_partition_offsets_list(self): + with self._lock: + return [(partition, messages[-1].offset()) + for partition, messages in self._consumed_messages.items() + if len(messages) > 0] + + def _query_thread(self): + self.logger.info("Starting query thread") + while not self._stop.is_set(): + try: + partitions = self._get_partition_offsets_list() + + for partition, max_consumed in partitions: + last_queried_offset = self._max_queried_offsets[ + partition] if partition in self._max_queried_offsets else -1 + + # no new messages consumed, skip query + if max_consumed <= last_queried_offset: + continue + + query = self._get_query(partition, last_queried_offset, + max_consumed) + self.logger.debug(f"Executing query: {query}") + + with self._query.run_query(query) as cursor: + with self._lock: + for row in cursor: + self._verify_next_message(partition, row[0]) + if len(self._errors) > 0: + self.logger.error( + f"violations detected: {self._errors}, stopping verifier" + ) + return + + if len(self._max_queried_offsets) > 0: + self.logger.debug( + f"Max queried offsets: {self._max_queried_offsets}" + ) + + except Exception as e: + self.logger.error(f"Error querying iceberg table: {e}") + time.sleep(2) + + def start(self): + self._executor.submit(self._consumer_thread) + self._executor.submit(self._query_thread) + + def _all_offsets_translated(self): + partitions = self._rpk.describe_topic(self.topic) + with self._lock: + for p in partitions: + if p.id not in self._max_queried_offsets: + self.logger.debug( + f"partition {p.id} not found in max offsets: {self._max_queried_offsets}" + ) + return False + + if self._max_queried_offsets[p.id] < p.high_watermark - 1: + self.logger.debug( + f"partition {p.id} high watermark: {p.high_watermark}, max offset: {self._max_queried_offsets[p.id]}" + ) + return False + return True + + def _made_progress(self): + progress = False + with self._lock: + for partition, offset in self._max_queried_offsets.items(): + if offset > self._last_checkpoint.get(partition, -1): + progress = True + break + + self._last_checkpoint = self._max_queried_offsets.copy() + return progress + + def wait(self, progress_timeout_sec=30): + try: + while not self._all_offsets_translated(): + wait_until( + lambda: self._made_progress(), + progress_timeout_sec, + backoff_sec=3, + err_msg= + f"Error waiting for the query to make progress for topic {self.topic}" + ) + assert len( + self._errors + ) == 0, f"Topic {self.topic} validation errors: {self._errors}" + + finally: + self.stop() + + def stop(self): + self._stop.set() + self._msg_semaphore.release() + self._executor.shutdown(wait=False) + assert len( + self._errors + ) == 0, f"Topic {self.topic} validation errors: {self._errors}" + + assert all( + len(messages) == 0 for messages in self._consumed_messages.values( + )), f"Partition left with consumed but not translated messages" diff --git a/tests/rptest/tests/datalake/datalake_verifier_test.py b/tests/rptest/tests/datalake/datalake_verifier_test.py new file mode 100644 index 000000000000..974a0736c8a6 --- /dev/null +++ b/tests/rptest/tests/datalake/datalake_verifier_test.py @@ -0,0 +1,128 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import json +import os +import tempfile +import time +from rptest.clients.rpk import RpkTool +from rptest.services.cluster import cluster +from rptest.services.redpanda import SISettings, SchemaRegistryConfig +from rptest.services.redpanda_connect import RedpandaConnectService +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.datalake_verifier import DatalakeVerifier +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix + +from rptest.utils.mode_checks import ignore + + +# Test checking that the verifier can detect Iceberg table anomalies +class DatalakeVerifierTest(RedpandaTest): + def __init__(self, test_context): + self._topic = None + super(DatalakeVerifierTest, self).__init__( + test_context=test_context, + num_brokers=1, + si_settings=SISettings(test_context, + cloud_storage_enable_remote_read=False, + cloud_storage_enable_remote_write=False), + extra_rp_conf={ + "iceberg_enabled": True, + "iceberg_catalog_commit_interval_ms": 5000 + }, + schema_registry_config=SchemaRegistryConfig()) + + def simple_stream(self, topic, subject): + return { + "input": { + "generate": { + "mapping": "root = counter()", + "interval": "", + "count": 100, + "batch_size": 1 + } + }, + "pipeline": { + "processors": [] + }, + "output": { + "redpanda": { + "seed_brokers": self.redpanda.brokers_list(), + "topic": topic, + } + } + } + + def setUp(self): + pass + + def _prepare_test_data(self, topic_name: str, dl: DatalakeServices): + + dl.create_iceberg_enabled_topic(topic_name, + partitions=1, + replicas=1, + iceberg_mode="key_value") + + connect = RedpandaConnectService(self.test_context, self.redpanda) + connect.start() + # create a stream + connect.start_stream(name="ducky_stream", + config=self.simple_stream(topic_name, + "verifier_schema")) + dl.wait_for_translation(topic_name, 100) + connect.wait_for_stream_to_finish("ducky_stream") + + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types()) + def test_detecting_gap_in_offset_sequence(self, cloud_storage_type): + topic_name = "ducky_topic" + with DatalakeServices(self.test_context, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self._prepare_test_data(topic_name, dl) + dl.trino().run_query_fetch_all( + f"DELETE FROM redpanda.{topic_name} WHERE redpanda.offset=10") + verifier = DatalakeVerifier(self.redpanda, topic_name, dl.trino()) + + verifier.start() + try: + verifier.wait() + assert False, "Verifier should have failed" + except Exception as e: + assert "gap in the table" in str( + e), f"Error: {e} should contain 'gap in the table'" + + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types()) + def test_detecting_duplicates(self, cloud_storage_type): + topic_name = "ducky_topic" + with DatalakeServices(self.test_context, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self._prepare_test_data(topic_name, dl) + + # Insert duplicate + dl.trino().run_query_fetch_all( + f"INSERT INTO redpanda.{topic_name} VALUES ( ROW(0, 10, TIMESTAMP '2024-11-29 10:00:00' , ARRAY[], CAST('key' as VARBINARY )), CAST('value' AS VARBINARY))" + ) + + verifier = DatalakeVerifier(self.redpanda, topic_name, dl.trino()) + verifier.start() + try: + verifier.wait() + assert False, "Verifier should have failed" + except Exception as e: + assert "Duplicate" in str( + e), f"Error: {e} should contain 'duplicate'" diff --git a/tests/rptest/tests/datalake/query_engine_base.py b/tests/rptest/tests/datalake/query_engine_base.py index c97ad156b549..176181d527f1 100644 --- a/tests/rptest/tests/datalake/query_engine_base.py +++ b/tests/rptest/tests/datalake/query_engine_base.py @@ -42,16 +42,20 @@ def run_query(self, query): finally: client.close() + @abstractmethod + def escape_identifier(self, table: str) -> str: + raise NotImplementedError + def run_query_fetch_all(self, query): with self.run_query(query) as cursor: return cursor.fetchall() - def count_table(self, table) -> int: - query = f"select count(*) from {table}" + def count_table(self, namespace, table) -> int: + query = f"select count(*) from {namespace}.{self.escape_identifier(table)}" with self.run_query(query) as cursor: return int(cursor.fetchone()[0]) - def max_translated_offset(self, table, partition) -> int: - query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}" + def max_translated_offset(self, namespace, table, partition) -> int: + query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}" with self.run_query(query) as cursor: return int(cursor.fetchone()[0]) diff --git a/tests/rptest/tests/datalake/simple_connect_test.py b/tests/rptest/tests/datalake/simple_connect_test.py new file mode 100644 index 000000000000..0c117fdcaccc --- /dev/null +++ b/tests/rptest/tests/datalake/simple_connect_test.py @@ -0,0 +1,139 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import json +import os +import tempfile +import time +from rptest.clients.rpk import RpkTool +from rptest.services.cluster import cluster +from rptest.services.redpanda import SISettings, SchemaRegistryConfig +from rptest.services.redpanda_connect import RedpandaConnectService +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.datalake_verifier import DatalakeVerifier +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix + + +class RedpandaConnectIcebergTest(RedpandaTest): + + verifier_schema_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "verifier_string", + "type": "string" + }, + { + "name": "ordinal", + "type": "long" + }, + { + "name": "timestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] +} + """ + + def __init__(self, test_context): + self._topic = None + super(RedpandaConnectIcebergTest, self).__init__( + test_context=test_context, + num_brokers=3, + si_settings=SISettings(test_context, + cloud_storage_enable_remote_read=False, + cloud_storage_enable_remote_write=False), + extra_rp_conf={ + "iceberg_enabled": True, + "iceberg_catalog_commit_interval_ms": 5000 + }, + schema_registry_config=SchemaRegistryConfig()) + + def avro_stream_config(self, topic, subject): + return { + "input": { + "generate": { + "mapping": "root = counter()", + "interval": "", + "count": 3000, + "batch_size": 1 + } + }, + "pipeline": { + "processors": [{ + "mapping": + """ + root.ordinal = this + root.timestamp = timestamp_unix_milli() + root.verifier_string = uuid_v4() + """ + }, { + "schema_registry_encode": { + "url": self.redpanda.schema_reg().split(",")[0], + "subject": subject, + "refresh_period": "10s" + } + }] + }, + "output": { + "redpanda": { + "seed_brokers": self.redpanda.brokers_list(), + "topic": topic, + } + } + } + + def setUp(self): + pass + + def _create_schema(self, subject: str, schema: str, schema_type="avro"): + rpk = RpkTool(self.redpanda) + with tempfile.NamedTemporaryFile(suffix=f".{schema_type}") as tf: + tf.write(bytes(schema, 'UTF-8')) + tf.flush() + rpk.create_schema(subject, tf.name) + + @cluster(num_nodes=6) + @matrix(cloud_storage_type=supported_storage_types()) + def test_translating_avro_serialized_records(self, cloud_storage_type): + topic_name = "ducky-topic" + with DatalakeServices(self.test_context, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[ + QueryEngineType.SPARK, + ]) as dl: + + dl.create_iceberg_enabled_topic( + topic_name, + partitions=5, + replicas=3, + iceberg_mode="value_schema_id_prefix") + + self._create_schema("verifier_schema", self.verifier_schema_avro) + connect = RedpandaConnectService(self.test_context, self.redpanda) + connect.start() + + # create verifier + verifier = DatalakeVerifier(self.redpanda, topic_name, dl.spark()) + # create a stream + connect.start_stream(name="ducky_stream", + config=self.avro_stream_config( + topic_name, "verifier_schema")) + + verifier.start() + connect.wait_for_stream_to_finish("ducky_stream") + verifier.wait()