diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index da63815d6..c390bee04 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -68,7 +68,7 @@ impl ManifestList { from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider) } FormatVersion::V2 => { - let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?; + let reader = Reader::new(bs)?; let values = Value::Array(reader.collect::, _>>()?); from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider) } @@ -802,6 +802,9 @@ pub(super) mod _serde { pub key_metadata: Option, } + // Aliases were added to fields that were renamed in Iceberg 1.5.0 (https://github.com/apache/iceberg/pull/5338), in order to support both conventions/versions. + // In the current implementation deserialization is done using field names, and therefore these fields may appear as either. + // see issue that raised this here: https://github.com/apache/iceberg-rust/issues/338 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub(super) struct ManifestFileV2 { pub manifest_path: String, @@ -811,8 +814,11 @@ pub(super) mod _serde { pub sequence_number: i64, pub min_sequence_number: i64, pub added_snapshot_id: i64, + #[serde(alias = "added_data_files_count", alias = "added_files_count")] pub added_data_files_count: i32, + #[serde(alias = "existing_data_files_count", alias = "existing_files_count")] pub existing_data_files_count: i32, + #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")] pub deleted_data_files_count: i32, pub added_rows_count: i64, pub existing_rows_count: i64, @@ -1089,16 +1095,16 @@ pub(super) mod _serde { #[cfg(test)] mod test { + use apache_avro::{Reader, Schema}; use std::{collections::HashMap, fs, sync::Arc}; - use tempfile::TempDir; use crate::{ io::FileIOBuilder, spec::{ - manifest_list::{_serde::ManifestListV1, UNASSIGNED_SEQUENCE_NUMBER}, - FieldSummary, Literal, ManifestContentType, ManifestFile, ManifestList, - ManifestListWriter, NestedField, PrimitiveType, StructType, Type, + manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType, + ManifestFile, ManifestList, ManifestListWriter, NestedField, PrimitiveType, StructType, + Type, UNASSIGNED_SEQUENCE_NUMBER, }, }; @@ -1462,4 +1468,50 @@ mod test { temp_dir.close().unwrap(); } + + #[tokio::test] + async fn test_manifest_list_v2_deserializer_aliases() { + // reading avro manifest file generated by iceberg 1.4.0 + let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro"; + let bs_1 = fs::read(avro_1_path).unwrap(); + let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await; + assert_eq!( + avro_1_fields, + "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions" + ); + // reading avro manifest file generated by iceberg 1.5.0 + let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro"; + let bs_2 = fs::read(avro_2_path).unwrap(); + let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await; + assert_eq!( + avro_2_fields, + "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions" + ); + // deserializing both files to ManifestList struct + let _manifest_list_1 = + ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2, move |_id| { + Ok(Some(StructType::new(vec![]))) + }) + .unwrap(); + let _manifest_list_2 = + ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2, move |_id| { + Ok(Some(StructType::new(vec![]))) + }) + .unwrap(); + } + + async fn read_avro_schema_fields_as_str(bs: Vec) -> String { + let reader = Reader::new(&bs[..]).unwrap(); + let schema = reader.writer_schema(); + let fields: String = match schema { + Schema::Record(record) => record + .fields + .iter() + .map(|field| field.name.clone()) + .collect::>() + .join(", "), + _ => "".to_string(), + }; + fields + } } diff --git a/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro new file mode 100644 index 000000000..5c5cdb1ad Binary files /dev/null and b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro differ diff --git a/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro new file mode 100644 index 000000000..00784ff1d Binary files /dev/null and b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro differ