From 6f1f1480ffbc37e97df83f969c47eed569c611ec Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 20 Dec 2024 14:41:50 +0000 Subject: [PATCH] rptest/datalake: test metadata interoperability with 3rd party system --- .../rptest/tests/datalake/3rdparty_rewrite.py | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 tests/rptest/tests/datalake/3rdparty_rewrite.py diff --git a/tests/rptest/tests/datalake/3rdparty_rewrite.py b/tests/rptest/tests/datalake/3rdparty_rewrite.py new file mode 100644 index 000000000000..0e1c07e441ff --- /dev/null +++ b/tests/rptest/tests/datalake/3rdparty_rewrite.py @@ -0,0 +1,122 @@ +from ducktape.mark import matrix + +from rptest.services.cluster import cluster +from rptest.services.redpanda import SISettings +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.datalake_verifier import DatalakeVerifier +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.datalake.utils import supported_storage_types +from rptest.tests.redpanda_test import RedpandaTest + + +class Datalake3rdPartyRewriteTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super().__init__(test_ctx, + num_brokers=1, + si_settings=SISettings(test_ctx), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000 + }, + *args, + **kwargs) + + self.test_ctx = test_ctx + self.topic_name = "test" + self.num_partitions = 10 + + self.produced_messages = 0 + + def setUp(self): + # redpanda will be started by DatalakeServices + pass + + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types(), + query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO], + filesystem_catalog_mode=[True, False]) + def test_e2e_basic(self, cloud_storage_type, query_engine, + filesystem_catalog_mode): + """ + This test verifies that Redpanda can continue to work with Iceberg + metadata written by third-party query engines. We use an optimize operation + with a third-party query engine to trigger a rewrite of the data files + and metadata. + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=filesystem_catalog_mode, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic(self.topic_name, + partitions=self.num_partitions) + # Write some data to the topic. + self._translate_sample_data(dl) + + # Run maintenance to rewrite the data. + self._run_maintenance(dl, query_engine) + + # Verify consistency post rewrite. + self._verify_consistency(dl, query_engine) + + # Produce additional messages to the topic to make sure we correctly + # interoperate with the metadata written by Trino. + self._translate_sample_data(dl) + + # Verify consistency with the additional messages. + self._verify_consistency(dl, query_engine) + + def _translate_sample_data(self, dl): + NUM_MSG_PER_SAMPLE = 100 + self.produced_messages += NUM_MSG_PER_SAMPLE + + dl.produce_to_topic(self.topic_name, 1024, NUM_MSG_PER_SAMPLE) + # Wait for all messages (including the ones we just wrote) to be translated. + dl.wait_for_translation(self.topic_name, + msg_count=self.produced_messages) + + def _run_maintenance(self, dl, query_engine): + if query_engine == QueryEngineType.TRINO: + trino = dl.trino() + + # See Trino metadata tables documentation + # https://trino.io/docs/current/connector/iceberg.html#alter-table-execute + initial_parquet_files = trino.count_table( + "redpanda", f"{self.topic_name}$files") + + assert initial_parquet_files >= self.num_partitions, f"Expecting at least {self.num_partitions} files, got {initial_parquet_files}" + + # Optimize the table to rewrite the data. + # https://trino.io/docs/current/connector/iceberg.html#alter-table-execute + trino.run_query_fetch_one( + f"ALTER TABLE redpanda.{self.topic_name} EXECUTE optimize") + + optimized_parquet_files = trino.count_table( + "redpanda", f"{self.topic_name}$files") + assert optimized_parquet_files == 1, f"Expecting 1 file after optimize, got {optimized_parquet_files}" + elif query_engine == QueryEngineType.SPARK: + spark = dl.spark() + + # Metadata query + # https://iceberg.apache.org/docs/1.6.1/spark-queries/#files + initial_parquet_files = spark.run_query_fetch_one( + f"SELECT count(*) FROM redpanda.{self.topic_name}.files")[0] + assert initial_parquet_files >= self.num_partitions, f"Expecting at least {self.num_partitions} files, got {initial_parquet_files}" + + # Spark Procedures provided by Iceberg SQL Extensions + # https://iceberg.apache.org/docs/1.6.1/spark-procedures/#rewrite_data_files + spark.run_query_fetch_one( + f"CALL `redpanda-iceberg-catalog`.system.rewrite_data_files(\"redpanda.{self.topic_name}\")" + ) + + optimized_parquet_files = spark.run_query_fetch_one( + f"SELECT count(*) FROM redpanda.{self.topic_name}.files")[0] + assert optimized_parquet_files == 1, f"Expecting 1 file after optimize, got {optimized_parquet_files}" + else: + raise NotImplementedError( + f"Unsupported query engine {query_engine}") + + def _verify_consistency(self, dl: DatalakeServices, query_engine): + verifier = DatalakeVerifier(self.redpanda, self.topic_name, + dl.any_query_engine()) + verifier.start() + verifier.wait()