Skip to content

Commit

Permalink
tests: add datalake topic lifecycle test
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Nov 25, 2024
1 parent 1e88454 commit 123ccdd
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
53 changes: 53 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Optional
from rptest.clients.serde_client_utils import SchemaType, SerdeClientType
from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster

from rptest.services.redpanda import PandaproxyConfig, SchemaRegistryConfig, SISettings
Expand All @@ -19,6 +20,7 @@
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix
from ducktape.utils.util import wait_until


class DatalakeE2ETests(RedpandaTest):
Expand Down Expand Up @@ -129,3 +131,54 @@ def test_avro_schema(self, cloud_storage_type, query_engine):
f"describe {table_name}")
assert spark_describe_out == spark_expected_out, str(
spark_describe_out)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[False])
def test_topic_lifecycle(self, cloud_storage_type,
filesystem_catalog_mode):
count = 100
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
rpk = RpkTool(self.redpanda)

# produce some data then delete the topic
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=count)

rpk.alter_topic_config(self.topic_name, "redpanda.iceberg.delete",
"false")
rpk.delete_topic(self.topic_name)

# table is not deleted, it will contain messages from both topic instances
dl.create_iceberg_enabled_topic(self.topic_name, partitions=15)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=2 * count)

# now table should be deleted
rpk.delete_topic(self.topic_name)

catalog_client = dl.catalog_client()

def table_deleted():
return not dl.table_exists(self.topic_name,
client=catalog_client)

wait_until(table_deleted,
timeout_sec=30,
backoff_sec=5,
err_msg="table was not deleted")

# recreate an empty topic a few times
for _ in range(3):
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)
rpk.delete_topic(self.topic_name)

# check that the table is recreated after we start producing again
dl.create_iceberg_enabled_topic(self.topic_name, partitions=5)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=count)
19 changes: 14 additions & 5 deletions tests/rptest/tests/datalake/datalake_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,23 @@ def set_iceberg_mode_on_topic(self, topic: str, mode: str):
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(topic, "redpanda.iceberg.mode", mode)

def catalog_client(self):
return self.catalog_service.client("redpanda-iceberg-catalog")

def table_exists(self, table, namespace="redpanda", client=None):
if client is None:
client = self.catalog_client()

namespaces = client.list_namespaces()
self.redpanda.logger.debug(f"namespaces: {namespaces}")
return (namespace, ) in namespaces and (
namespace, table) in client.list_tables(namespace)

def wait_for_iceberg_table(self, namespace, table, timeout, backoff_sec):
client = self.catalog_service.client("redpanda-iceberg-catalog")
client = self.catalog_client()

def table_created():
namespaces = client.list_namespaces()
self.redpanda.logger.debug(f"namespaces: {namespaces}")
return (namespace, ) in namespaces and (
namespace, table) in client.list_tables(namespace)
return self.table_exists(table, namespace=namespace, client=client)

wait_until(
table_created,
Expand Down

0 comments on commit 123ccdd

Please sign in to comment.