Skip to content

Commit

Permalink
fix(iceberg): Load the latest table to avoid concurrent modification …
Browse files Browse the repository at this point in the history
…with the best effort (#17975)
  • Loading branch information
chenzl25 authored Aug 8, 2024
1 parent f5f5701 commit 38927e8
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,12 @@ impl Sink for IcebergSink {
}

async fn new_coordinator(&self) -> Result<Self::Coordinator> {
let catalog = self.config.create_catalog().await?;
let table = self.create_and_validate_table().await?;
let partition_type = table.current_partition_type()?;

Ok(IcebergSinkCommitter {
catalog,
table,
partition_type,
})
Expand Down Expand Up @@ -1139,6 +1141,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata {
}

pub struct IcebergSinkCommitter {
catalog: CatalogRef,
table: Table,
partition_type: Any,
}
Expand All @@ -1165,6 +1168,12 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
tracing::debug!(?epoch, "no data to commit");
return Ok(());
}
// Load the latest table to avoid concurrent modification with the best effort.
self.table = self
.catalog
.clone()
.load_table(self.table.table_name())
.await?;
let mut txn = Transaction::new(&mut self.table);
write_results.into_iter().for_each(|s| {
txn.append_data_file(s.data_files);
Expand Down

0 comments on commit 38927e8

Please sign in to comment.