Skip to content

Commit

Permalink
iceberg: fix inconsistency in manifest list files_count
Browse files Browse the repository at this point in the history
The schema we are using was pulled some time ago and appears to be
outdated. The Apache Iceberg Java implementation has since renamed
added_data_files_count and friends to added_files_count, to match the
documented spec.

This meant that after updating the table with an external non-Redpanda
writer, Redpanda wouldn't be able to download the current manifest list
when appending and get stuck, complaining about an EOF (presumably the
Avro C++ library throws this when there is an unknown field).

I suspect that this may have also been the cause of an EOF seen when
trying to read a manifest list with BigQuery:

Error while reading data, error message: The Apache Avro failed to read data with the following error: EOF reached File: [...]/metadata/snap-[...]-0.avro

The old names are added as an alias to ensure Redpanda can still
download Iceberg manifest lists from 24.3.

(cherry picked from commit a3e6880)
  • Loading branch information
andrwng committed Dec 24, 2024
1 parent 7ec7c6d commit d9c34cb
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
9 changes: 6 additions & 3 deletions src/v/iceberg/avroschemas/manifest_file.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@
"field-id": 503
},
{
"name": "added_data_files_count",
"name": "added_files_count",
"aliases": ["added_data_files_count"],
"type": "int",
"doc": "Added entry count",
"field-id": 504
},
{
"name": "existing_data_files_count",
"name": "existing_files_count",
"aliases": ["existing_data_files_count"],
"type": "int",
"doc": "Existing entry count",
"field-id": 505
},
{
"name": "deleted_data_files_count",
"name": "deleted_files_count",
"aliases": ["deleted_data_files_count"],
"type": "int",
"doc": "Deleted entry count",
"field-id": 506
Expand Down
13 changes: 6 additions & 7 deletions src/v/iceberg/manifest_list_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) {
ret.min_sequence_number = f.min_seq_number();
ret.added_snapshot_id = f.added_snapshot_id();

ret.added_data_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_data_files_count = static_cast<int32_t>(
f.existing_files_count);
ret.deleted_data_files_count = static_cast<int32_t>(f.deleted_files_count);
ret.added_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_files_count = static_cast<int32_t>(f.existing_files_count);
ret.deleted_files_count = static_cast<int32_t>(f.deleted_files_count);

ret.added_rows_count = static_cast<int32_t>(f.added_rows_count);
ret.existing_rows_count = static_cast<int32_t>(f.existing_rows_count);
Expand Down Expand Up @@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) {
ret.min_seq_number = sequence_number{f.min_sequence_number};
ret.added_snapshot_id = snapshot_id{f.added_snapshot_id};

ret.added_files_count = f.added_data_files_count;
ret.existing_files_count = f.existing_data_files_count;
ret.deleted_files_count = f.deleted_data_files_count;
ret.added_files_count = f.added_files_count;
ret.existing_files_count = f.existing_files_count;
ret.deleted_files_count = f.deleted_files_count;

ret.added_rows_count = f.added_rows_count;
ret.existing_rows_count = f.existing_rows_count;
Expand Down
30 changes: 12 additions & 18 deletions src/v/iceberg/tests/manifest_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand All @@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand Down
25 changes: 25 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine,
assert spark_describe_out == spark_expected_out, str(
spark_describe_out)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types())
def test_upload_after_external_update(self, cloud_storage_type):
table_name = f"redpanda.{self.topic_name}"
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
count = 100
dl.create_iceberg_enabled_topic(self.topic_name, partitions=1)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, count)
spark = dl.spark()
spark.make_client().cursor().execute(f"delete from {table_name}")
count_after_del = spark.count_table("redpanda", self.topic_name)
assert count_after_del == 0, f"{count_after_del} rows, expected 0"

dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation_until_offset(self.topic_name,
2 * count - 1)
count_after_produce = spark.count_table("redpanda",
self.topic_name)
assert count_after_produce == count, f"{count_after_produce} rows, expected {count}"

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
Expand Down

0 comments on commit d9c34cb

Please sign in to comment.