diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index f41f78092e92..6ef294829a99 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -9,6 +9,7 @@ from typing import Optional from rptest.clients.serde_client_utils import SchemaType, SerdeClientType from rptest.clients.types import TopicSpec +from rptest.clients.rpk import RpkTool from rptest.services.cluster import cluster from rptest.services.redpanda import PandaproxyConfig, SchemaRegistryConfig, SISettings @@ -19,6 +20,7 @@ from rptest.tests.datalake.query_engine_base import QueryEngineType from rptest.tests.datalake.utils import supported_storage_types from ducktape.mark import matrix +from ducktape.utils.util import wait_until class DatalakeE2ETests(RedpandaTest): @@ -129,3 +131,54 @@ def test_avro_schema(self, cloud_storage_type, query_engine): f"describe {table_name}") assert spark_describe_out == spark_expected_out, str( spark_describe_out) + + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types(), + filesystem_catalog_mode=[False]) + def test_topic_lifecycle(self, cloud_storage_type, + filesystem_catalog_mode): + count = 100 + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=filesystem_catalog_mode, + include_query_engines=[QueryEngineType.SPARK + ]) as dl: + rpk = RpkTool(self.redpanda) + + # produce some data then delete the topic + dl.create_iceberg_enabled_topic(self.topic_name, partitions=10) + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation(self.topic_name, msg_count=count) + + rpk.alter_topic_config(self.topic_name, "redpanda.iceberg.delete", + "false") + rpk.delete_topic(self.topic_name) + + # table is not deleted, it will contain messages from both topic instances + dl.create_iceberg_enabled_topic(self.topic_name, partitions=15) + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation(self.topic_name, msg_count=2 * count) + + # now table should be deleted + rpk.delete_topic(self.topic_name) + + catalog_client = dl.catalog_client() + + def table_deleted(): + return not dl.table_exists(self.topic_name, + client=catalog_client) + + wait_until(table_deleted, + timeout_sec=30, + backoff_sec=5, + err_msg="table was not deleted") + + # recreate an empty topic a few times + for _ in range(3): + dl.create_iceberg_enabled_topic(self.topic_name, partitions=10) + rpk.delete_topic(self.topic_name) + + # check that the table is recreated after we start producing again + dl.create_iceberg_enabled_topic(self.topic_name, partitions=5) + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation(self.topic_name, msg_count=count) diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index e2376d452cdf..09a09b234fc7 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -117,14 +117,23 @@ def set_iceberg_mode_on_topic(self, topic: str, mode: str): rpk = RpkTool(self.redpanda) rpk.alter_topic_config(topic, "redpanda.iceberg.mode", mode) + def catalog_client(self): + return self.catalog_service.client("redpanda-iceberg-catalog") + + def table_exists(self, table, namespace="redpanda", client=None): + if client is None: + client = self.catalog_client() + + namespaces = client.list_namespaces() + self.redpanda.logger.debug(f"namespaces: {namespaces}") + return (namespace, ) in namespaces and ( + namespace, table) in client.list_tables(namespace) + def wait_for_iceberg_table(self, namespace, table, timeout, backoff_sec): - client = self.catalog_service.client("redpanda-iceberg-catalog") + client = self.catalog_client() def table_created(): - namespaces = client.list_namespaces() - self.redpanda.logger.debug(f"namespaces: {namespaces}") - return (namespace, ) in namespaces and ( - namespace, table) in client.list_tables(namespace) + return self.table_exists(table, namespace=namespace, client=client) wait_until( table_created,