Skip to content

Commit

Permalink
rptest/datalake: test metadata interoperability with 3rd party system
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Dec 23, 2024
1 parent c612c19 commit 6f1f148
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions tests/rptest/tests/datalake/3rdparty_rewrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from ducktape.mark import matrix

from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from rptest.tests.redpanda_test import RedpandaTest


class Datalake3rdPartyRewriteTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super().__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
*args,
**kwargs)

self.test_ctx = test_ctx
self.topic_name = "test"
self.num_partitions = 10

self.produced_messages = 0

def setUp(self):
# redpanda will be started by DatalakeServices
pass

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO],
filesystem_catalog_mode=[True, False])
def test_e2e_basic(self, cloud_storage_type, query_engine,
filesystem_catalog_mode):
"""
This test verifies that Redpanda can continue to work with Iceberg
metadata written by third-party query engines. We use an optimize operation
with a third-party query engine to trigger a rewrite of the data files
and metadata.
"""
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(self.topic_name,
partitions=self.num_partitions)
# Write some data to the topic.
self._translate_sample_data(dl)

# Run maintenance to rewrite the data.
self._run_maintenance(dl, query_engine)

# Verify consistency post rewrite.
self._verify_consistency(dl, query_engine)

# Produce additional messages to the topic to make sure we correctly
# interoperate with the metadata written by Trino.
self._translate_sample_data(dl)

# Verify consistency with the additional messages.
self._verify_consistency(dl, query_engine)

def _translate_sample_data(self, dl):
NUM_MSG_PER_SAMPLE = 100
self.produced_messages += NUM_MSG_PER_SAMPLE

dl.produce_to_topic(self.topic_name, 1024, NUM_MSG_PER_SAMPLE)
# Wait for all messages (including the ones we just wrote) to be translated.
dl.wait_for_translation(self.topic_name,
msg_count=self.produced_messages)

def _run_maintenance(self, dl, query_engine):
if query_engine == QueryEngineType.TRINO:
trino = dl.trino()

# See Trino metadata tables documentation
# https://trino.io/docs/current/connector/iceberg.html#alter-table-execute
initial_parquet_files = trino.count_table(
"redpanda", f"{self.topic_name}$files")

assert initial_parquet_files >= self.num_partitions, f"Expecting at least {self.num_partitions} files, got {initial_parquet_files}"

# Optimize the table to rewrite the data.
# https://trino.io/docs/current/connector/iceberg.html#alter-table-execute
trino.run_query_fetch_one(
f"ALTER TABLE redpanda.{self.topic_name} EXECUTE optimize")

optimized_parquet_files = trino.count_table(
"redpanda", f"{self.topic_name}$files")
assert optimized_parquet_files == 1, f"Expecting 1 file after optimize, got {optimized_parquet_files}"
elif query_engine == QueryEngineType.SPARK:
spark = dl.spark()

# Metadata query
# https://iceberg.apache.org/docs/1.6.1/spark-queries/#files
initial_parquet_files = spark.run_query_fetch_one(
f"SELECT count(*) FROM redpanda.{self.topic_name}.files")[0]
assert initial_parquet_files >= self.num_partitions, f"Expecting at least {self.num_partitions} files, got {initial_parquet_files}"

# Spark Procedures provided by Iceberg SQL Extensions
# https://iceberg.apache.org/docs/1.6.1/spark-procedures/#rewrite_data_files
spark.run_query_fetch_one(
f"CALL `redpanda-iceberg-catalog`.system.rewrite_data_files(\"redpanda.{self.topic_name}\")"
)

optimized_parquet_files = spark.run_query_fetch_one(
f"SELECT count(*) FROM redpanda.{self.topic_name}.files")[0]
assert optimized_parquet_files == 1, f"Expecting 1 file after optimize, got {optimized_parquet_files}"
else:
raise NotImplementedError(
f"Unsupported query engine {query_engine}")

def _verify_consistency(self, dl: DatalakeServices, query_engine):
verifier = DatalakeVerifier(self.redpanda, self.topic_name,
dl.any_query_engine())
verifier.start()
verifier.wait()

0 comments on commit 6f1f148

Please sign in to comment.