From 88f9efa58c1366dbce901749c53febf583f0c0b1 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:47:40 +0800 Subject: [PATCH] fix(starrocks): fix starrocks coordinator incorrect try_join_all on join handle (#19412) --- src/connector/src/sink/starrocks.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 35d1033d8ee1..b5b6bf90e025 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -33,7 +33,6 @@ use serde_derive::Serialize; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; -use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; @@ -898,16 +897,12 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { tracing::debug!(?epoch, ?txn_labels, "commit transaction"); if !txn_labels.is_empty() { - let join_handles = txn_labels - .into_iter() - .map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }) - }) - .collect::>>>(); - futures::future::try_join_all(join_handles) - .await - .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + futures::future::try_join_all( + txn_labels + .into_iter() + .map(|txn_label| self.client.commit(txn_label)), + ) + .await?; } Ok(()) }