Skip to content

Commit

Permalink
feat: Add integration test and support append DataFile (apache#349)
Browse files Browse the repository at this point in the history
* support append data file and add e2e test

* fix typos

* refine append action

* fix cargo sort

* add consistent check for partition value

* generate unique snapshot id

* avoid to set snapshot id for v2

* refine test

* fix unit test

* export ports

* fix None case for parant_snapshot_id

* fix parquect schema check

* refactor append action of transaction

* refine

* refine e2e test

* refine commit uuid

* fix file format field to uppercase in manifest

* refine SnapshotProduceAction

* rename e2e_test to integration_tests

* fix

* use docker-compose.yaml from rest catalog

* fix check

---------

Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
2 people authored and shaeqahmed committed Dec 9, 2024
1 parent bf802dc commit 7d1d1ac
Show file tree
Hide file tree
Showing 17 changed files with 1,130 additions and 53 deletions.
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integration_tests",
"crates/integrations/*",
"crates/test_utils",
]
exclude = ["bindings/python"]

Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ mod tests {
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
additional_properties: HashMap::from_iter([
("spark.app.id", "local-1646787004168"),
("added-data-files", "1"),
("added-records", "1"),
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ mod tests {
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
additional_properties: HashMap::default(),
})
.build(),
};
Expand Down Expand Up @@ -1457,7 +1457,7 @@ mod tests {
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
additional_properties: HashMap::default(),
})
.build(),
};
Expand Down
6 changes: 2 additions & 4 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
ManifestWriter, Struct, TableMetadata,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -293,9 +293,7 @@ mod tests {
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
Expand Down
5 changes: 1 addition & 4 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,6 @@ mod tests {
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
EMPTY_SNAPSHOT_ID,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -1124,9 +1123,7 @@ mod tests {
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
Expand Down
8 changes: 7 additions & 1 deletion crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@ impl ManifestEntry {
}
}

/// Snapshot id
#[inline]
pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
}

/// Data sequence number.
#[inline]
pub fn sequence_number(&self) -> Option<i64> {
Expand Down Expand Up @@ -1328,7 +1334,7 @@ mod _serde {
Ok(Self {
content: value.content as i32,
file_path: value.file_path,
file_format: value.file_format.to_string(),
file_format: value.file_format.to_string().to_ascii_uppercase(),
partition: RawLiteral::try_from(
Literal::Struct(value.partition),
&Type::Struct(partition_type.clone()),
Expand Down
50 changes: 33 additions & 17 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {

impl ManifestListWriter {
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
let metadata = HashMap::from_iter([
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
),
("format-version".to_string(), "1".to_string()),
]);
if let Some(parent_snapshot_id) = parent_snapshot_id {
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
);
}
Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
}

/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v2(
output_file: OutputFile,
snapshot_id: i64,
parent_snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
) -> Self {
let metadata = HashMap::from_iter([
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
),
("sequence-number".to_string(), sequence_number.to_string()),
("format-version".to_string(), "2".to_string()),
]);
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id
.map(|v| v.to_string())
.unwrap_or("null".to_string()),
);
Self::new(
FormatVersion::V2,
output_file,
Expand Down Expand Up @@ -580,6 +584,18 @@ pub struct ManifestFile {
pub key_metadata: Vec<u8>,
}

impl ManifestFile {
/// Checks if the manifest file has any added files.
pub fn has_added_files(&self) -> bool {
self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
}

/// Checks if the manifest file has any existed files.
pub fn has_existing_files(&self) -> bool {
self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
}
}

/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
#[derive(Debug, PartialEq, Clone, Eq)]
pub enum ManifestContentType {
Expand Down Expand Up @@ -1146,7 +1162,7 @@ mod test {
let mut writer = ManifestListWriter::v1(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
1646658105718557341,
Some(1646658105718557341),
);

writer
Expand Down Expand Up @@ -1213,7 +1229,7 @@ mod test {
let mut writer = ManifestListWriter::v2(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
1646658105718557341,
Some(1646658105718557341),
1,
);

Expand Down Expand Up @@ -1335,7 +1351,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1391,7 +1407,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num);
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1445,7 +1461,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down
21 changes: 18 additions & 3 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Summary {
pub operation: Operation,
/// Other summary data.
#[serde(flatten)]
pub other: HashMap<String, String>,
pub additional_properties: HashMap<String, String>,
}

impl Default for Operation {
Expand Down Expand Up @@ -291,7 +291,7 @@ pub(super) mod _serde {
},
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
additional_properties: HashMap::new(),
}),
schema_id: v1.schema_id,
})
Expand Down Expand Up @@ -372,6 +372,21 @@ pub enum SnapshotRetention {
},
}

impl SnapshotRetention {
/// Create a new branch retention policy
pub fn branch(
min_snapshots_to_keep: Option<i32>,
max_snapshot_age_ms: Option<i64>,
max_ref_age_ms: Option<i64>,
) -> Self {
SnapshotRetention::Branch {
min_snapshots_to_keep,
max_snapshot_age_ms,
max_ref_age_ms,
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down Expand Up @@ -408,7 +423,7 @@ mod tests {
assert_eq!(
Summary {
operation: Operation::Append,
other: HashMap::new()
additional_properties: HashMap::new()
},
*result.summary()
);
Expand Down
18 changes: 15 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ impl TableMetadata {
self.last_sequence_number
}

/// Returns the next sequence number for the table.
///
/// For format version 1, it always returns the initial sequence number.
/// For other versions, it returns the last sequence number incremented by 1.
#[inline]
pub fn next_sequence_number(&self) -> i64 {
match self.format_version {
FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
_ => self.last_sequence_number + 1,
}
}

/// Returns last updated time.
#[inline]
pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
Expand Down Expand Up @@ -1476,7 +1488,7 @@ mod tests {
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build();

let expected = TableMetadata {
Expand Down Expand Up @@ -1895,7 +1907,7 @@ mod tests {
.with_manifest_list("s3://a/b/1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand All @@ -1908,7 +1920,7 @@ mod tests {
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand Down
14 changes: 7 additions & 7 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
Expand Down Expand Up @@ -1881,7 +1881,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
Expand All @@ -1901,7 +1901,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
Expand Down Expand Up @@ -1949,7 +1949,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand Down Expand Up @@ -1994,7 +1994,7 @@ mod tests {
.with_manifest_list("/snap-1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter(vec![
additional_properties: HashMap::from_iter(vec![
(
"spark.app.id".to_string(),
"local-1662532784305".to_string(),
Expand Down Expand Up @@ -2114,7 +2114,7 @@ mod tests {
.with_manifest_list("/snap-1")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand All @@ -2140,7 +2140,7 @@ mod tests {
.with_parent_snapshot_id(Some(1))
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand Down
Loading

0 comments on commit 7d1d1ac

Please sign in to comment.