From 5ffc8144bbd457485927b5266a16bdfa141fcc30 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:44:47 +0800 Subject: [PATCH] fix(starrocks): fix starrocks coordinator incorrect try_join_all on join handle (#19412) (#19427) Co-authored-by: William Wen <44139337+wenym1@users.noreply.github.com> --- src/connector/src/sink/starrocks.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 456415a62e0d6..662fa0fdbbfc8 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -33,7 +33,6 @@ use serde_derive::Serialize; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; -use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; @@ -881,16 +880,12 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { tracing::debug!(?epoch, ?txn_labels, "commit transaction"); if !txn_labels.is_empty() { - let join_handles = txn_labels - .into_iter() - .map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }) - }) - .collect::>>>(); - futures::future::try_join_all(join_handles) - .await - .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + futures::future::try_join_all( + txn_labels + .into_iter() + .map(|txn_label| self.client.commit(txn_label)), + ) + .await?; } Ok(()) }