diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 6ae317e53c89d..4a0bee5e306f2 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -33,7 +33,6 @@ use serde_derive::Serialize; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; -use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; @@ -863,16 +862,12 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { tracing::debug!(?epoch, ?txn_labels, "commit transaction"); if !txn_labels.is_empty() { - let join_handles = txn_labels - .into_iter() - .map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }) - }) - .collect::>>>(); - futures::future::try_join_all(join_handles) - .await - .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + futures::future::try_join_all( + txn_labels + .into_iter() + .map(|txn_label| self.client.commit(txn_label)), + ) + .await?; } Ok(()) }