From 38927e8beaa0604172f64d40732ef8b3c2a1f814 Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 8 Aug 2024 20:33:06 +0800 Subject: [PATCH] fix(iceberg): Load the latest table to avoid concurrent modification with the best effort (#17975) --- src/connector/src/sink/iceberg/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 1b35c67830546..a17c98985de0c 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -764,10 +764,12 @@ impl Sink for IcebergSink { } async fn new_coordinator(&self) -> Result { + let catalog = self.config.create_catalog().await?; let table = self.create_and_validate_table().await?; let partition_type = table.current_partition_type()?; Ok(IcebergSinkCommitter { + catalog, table, partition_type, }) @@ -1139,6 +1141,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { } pub struct IcebergSinkCommitter { + catalog: CatalogRef, table: Table, partition_type: Any, } @@ -1165,6 +1168,12 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { tracing::debug!(?epoch, "no data to commit"); return Ok(()); } + // Load the latest table to avoid concurrent modification with the best effort. + self.table = self + .catalog + .clone() + .load_table(self.table.table_name()) + .await?; let mut txn = Transaction::new(&mut self.table); write_results.into_iter().for_each(|s| { txn.append_data_file(s.data_files);