From fff5de4cfae170680d20aa9a0abf8c78ad65e926 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Fri, 3 Nov 2023 14:51:23 +0800 Subject: [PATCH] fix: update icelake upsert query (#13232) Co-authored-by: ZENOTME --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/connector/src/sink/iceberg.rs | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8455e0df58117..ebb558bc375cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3979,7 +3979,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=186fde7663545d1d6a5856ce9fbbc541224eadfb#186fde7663545d1d6a5856ce9fbbc541224eadfb" +source = "git+https://github.com/icelake-io/icelake?rev=5a5202d1f3502f0cf82041044b0427434da59adc#5a5202d1f3502f0cf82041044b0427434da59adc" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 919c990deb7d4..6f3554a7e2496 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,7 +112,7 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.0" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "5a5202d1f3502f0cf82041044b0427434da59adc" } arrow-array = "48" arrow-cast = "48" arrow-schema = "48" diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 1031c5181d81e..7fdbcc26e2693 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -849,7 +849,10 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { .iter() .map(|meta| WriteResult::try_from(meta, &self.partition_type)) .collect::>>()?; - + if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) { + tracing::debug!(?epoch, "no data to commit"); + return Ok(()); + } let mut txn = Transaction::new(&mut self.table); write_results.into_iter().for_each(|s| { txn.append_data_file(s.data_files);