diff --git a/src/v/iceberg/avroschemas/manifest_file.schema.json b/src/v/iceberg/avroschemas/manifest_file.schema.json index 6886935d3e5f6..15c21eeda50e2 100644 --- a/src/v/iceberg/avroschemas/manifest_file.schema.json +++ b/src/v/iceberg/avroschemas/manifest_file.schema.json @@ -45,19 +45,22 @@ "field-id": 503 }, { - "name": "added_data_files_count", + "name": "added_files_count", + "aliases": ["added_data_files_count"], "type": "int", "doc": "Added entry count", "field-id": 504 }, { - "name": "existing_data_files_count", + "name": "existing_files_count", + "aliases": ["existing_data_files_count"], "type": "int", "doc": "Existing entry count", "field-id": 505 }, { - "name": "deleted_data_files_count", + "name": "deleted_files_count", + "aliases": ["deleted_data_files_count"], "type": "int", "doc": "Deleted entry count", "field-id": 506 diff --git a/src/v/iceberg/manifest_list_avro.cc b/src/v/iceberg/manifest_list_avro.cc index 8696f857bb0a2..e12917efaf920 100644 --- a/src/v/iceberg/manifest_list_avro.cc +++ b/src/v/iceberg/manifest_list_avro.cc @@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) { ret.min_sequence_number = f.min_seq_number(); ret.added_snapshot_id = f.added_snapshot_id(); - ret.added_data_files_count = static_cast(f.added_files_count); - ret.existing_data_files_count = static_cast( - f.existing_files_count); - ret.deleted_data_files_count = static_cast(f.deleted_files_count); + ret.added_files_count = static_cast(f.added_files_count); + ret.existing_files_count = static_cast(f.existing_files_count); + ret.deleted_files_count = static_cast(f.deleted_files_count); ret.added_rows_count = static_cast(f.added_rows_count); ret.existing_rows_count = static_cast(f.existing_rows_count); @@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) { ret.min_seq_number = sequence_number{f.min_sequence_number}; ret.added_snapshot_id = snapshot_id{f.added_snapshot_id}; - ret.added_files_count = f.added_data_files_count; - ret.existing_files_count = f.existing_data_files_count; - ret.deleted_files_count = f.deleted_data_files_count; + ret.added_files_count = f.added_files_count; + ret.existing_files_count = f.existing_files_count; + ret.deleted_files_count = f.deleted_files_count; ret.added_rows_count = f.added_rows_count; ret.existing_rows_count = f.existing_rows_count; diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index 026403a81bcec..417381b4cc506 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.sequence_number = 3; manifest.min_sequence_number = 4; manifest.added_snapshot_id = 5; - manifest.added_data_files_count = 6; - manifest.existing_data_files_count = 7; - manifest.deleted_data_files_count = 8; + manifest.added_files_count = 6; + manifest.existing_files_count = 7; + manifest.deleted_files_count = 8; manifest.added_rows_count = 9; manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; @@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) { EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number); EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number); EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id); - EXPECT_EQ( - manifest.added_data_files_count, dmanifest.added_data_files_count); - EXPECT_EQ( - manifest.existing_data_files_count, dmanifest.existing_data_files_count); - EXPECT_EQ( - manifest.deleted_data_files_count, dmanifest.deleted_data_files_count); + EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count); + EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count); + EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count); EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count); EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count); EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count); @@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { manifest.sequence_number = 3; manifest.min_sequence_number = 4; manifest.added_snapshot_id = 5; - manifest.added_data_files_count = 6; - manifest.existing_data_files_count = 7; - manifest.deleted_data_files_count = 8; + manifest.added_files_count = 6; + manifest.existing_files_count = 7; + manifest.deleted_files_count = 8; manifest.added_rows_count = 9; manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; @@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number); EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number); EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id); - EXPECT_EQ( - manifest.added_data_files_count, dmanifest.added_data_files_count); - EXPECT_EQ( - manifest.existing_data_files_count, dmanifest.existing_data_files_count); - EXPECT_EQ( - manifest.deleted_data_files_count, dmanifest.deleted_data_files_count); + EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count); + EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count); + EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count); EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count); EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count); EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count); diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 0664e44ee0c56..e946ebd95105a 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -134,6 +134,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine): assert spark_describe_out == spark_expected_out, str( spark_describe_out) + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types()) + def test_upload_after_external_update(self, cloud_storage_type): + table_name = f"redpanda.{self.topic_name}" + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[QueryEngineType.SPARK + ]) as dl: + count = 100 + dl.create_iceberg_enabled_topic(self.topic_name, partitions=1) + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation(self.topic_name, count) + spark = dl.spark() + spark.make_client().cursor().execute(f"delete from {table_name}") + count_after_del = spark.count_table("redpanda", self.topic_name) + assert count_after_del == 0, f"{count_after_del} rows, expected 0" + + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation_until_offset(self.topic_name, + 2 * count - 1) + count_after_produce = spark.count_table("redpanda", + self.topic_name) + assert count_after_produce == count, f"{count_after_produce} rows, expected {count}" + @cluster(num_nodes=4) @matrix(cloud_storage_type=supported_storage_types(), filesystem_catalog_mode=[True, False]) diff --git a/tests/rptest/tests/datalake/datalake_upgrade_test.py b/tests/rptest/tests/datalake/datalake_upgrade_test.py new file mode 100644 index 0000000000000..eb0b7ac4ccd53 --- /dev/null +++ b/tests/rptest/tests/datalake/datalake_upgrade_test.py @@ -0,0 +1,69 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +from rptest.services.cluster import cluster + +from rptest.services.redpanda import SISettings +from rptest.utils.mode_checks import skip_debug_mode +from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix + + +class DatalakeUpgradeTest(RedpandaTest): + def __init__(self, test_context): + super(DatalakeUpgradeTest, + self).__init__(test_context, + num_brokers=3, + si_settings=SISettings(test_context=test_context), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000 + }) + self.test_ctx = test_context + self.topic_name = "upgrade_topic" + + # Initial version that supported Iceberg. + self.initial_version = (24, 3) + + def setUp(self): + self.redpanda._installer.install(self.redpanda.nodes, + self.initial_version) + + @cluster(num_nodes=6) + @skip_debug_mode + @matrix(cloud_storage_type=supported_storage_types(), + query_engine=[QueryEngineType.SPARK]) + def test_upload_through_upgrade(self, cloud_storage_type, query_engine): + """ + Test that Iceberg translation can progress through different versions + of Redpanda (e.g. ensuring that data format changes or additional + Iceberg fields don't block progress). + """ + total_count = 0 + versions = self.load_version_range(self.initial_version)[1:] + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic(self.topic_name, partitions=10) + + def run_workload(): + nonlocal total_count + count = 100 + dl.produce_to_topic(self.topic_name, 1024, msg_count=count) + total_count += count + dl.wait_for_translation(self.topic_name, msg_count=total_count) + + versions = self.load_version_range(self.initial_version) + for v in self.upgrade_through_versions(versions_in=versions, + already_running=True): + self.logger.info(f"Updated to {v}") + run_workload()