Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Nov 27, 2024
1 parent 6415d40 commit 60a7b8b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 25 deletions.
12 changes: 6 additions & 6 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ impl ManifestListWriter {
("sequence-number".to_string(), sequence_number.to_string()),
("format-version".to_string(), "2".to_string()),
]);
if let Some(parent_snapshot_id) = parent_snapshot_id {
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
);
}
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id
.map(|v| v.to_string())
.unwrap_or("null".to_string()),
);
Self::new(
FormatVersion::V2,
output_file,
Expand Down
51 changes: 32 additions & 19 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<'a> Transaction<'a> {
FastAppendAction::new(
self,
snapshot_id,
commit_uuid.unwrap_or_else(Uuid::new_v4),
commit_uuid.unwrap_or_else(Uuid::now_v7),
key_metadata,
HashMap::new(),
)
Expand Down Expand Up @@ -320,22 +320,23 @@ impl<'a> SnapshotProduceAction<'a> {
"Partition value is not compatitable with partition type",
));
}
if partition_value
.fields()
.iter()
.zip(partition_type.fields())
.any(|(value, field)| {
!field
.field_type
.as_primitive_type()
.unwrap()
.compatible(&value.as_primitive_literal().unwrap())
})
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatitable partition type",
));
for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
if !field
.field_type
.as_primitive_type()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Partition field should only be primitve type.",
)
})?
.compatible(&value.as_primitive_literal().unwrap())
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatitable partition type",
));
}
}
Ok(())
}
Expand Down Expand Up @@ -387,7 +388,7 @@ impl<'a> SnapshotProduceAction<'a> {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
.data_file(data_file);
if self.tx.table.metadata().format_version() as u8 == 1u8 {
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
builder.snapshot_id(self.snapshot_id).build()
} else {
// For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
Expand Down Expand Up @@ -781,6 +782,19 @@ mod tests {
async fn test_fast_append_action() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();

// check add data file with uncompatitable partition value
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::string("test"))]))
.build()
.unwrap();
assert!(action.add_data_files(vec![data_file.clone()]).is_err());

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
Expand All @@ -791,7 +805,6 @@ mod tests {
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();
let mut action = tx.fast_append(None, vec![]).unwrap();
action.add_data_files(vec![data_file.clone()]).unwrap();
let tx = action.apply().await.unwrap();

Expand Down

0 comments on commit 60a7b8b

Please sign in to comment.