From da34254febc08f3d06a34306d0b1b20538113ab8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 18 Nov 2024 13:32:01 +0800 Subject: [PATCH 1/2] fix(starrocks): fix starrocks coordinator incorrect try_join_all on join handle --- src/connector/src/sink/starrocks.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 35d1033d8ee1b..f650f6fa4af3d 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use bytes::Bytes; +use futures::FutureExt; use mysql_async::prelude::Queryable; use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; @@ -33,7 +34,6 @@ use serde_derive::Serialize; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; -use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; @@ -898,16 +898,15 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { tracing::debug!(?epoch, ?txn_labels, "commit transaction"); if !txn_labels.is_empty() { - let join_handles = txn_labels - .into_iter() - .map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }) + futures::future::try_join_all(txn_labels.into_iter().map(|txn_label| { + let client = self.client.clone(); + tokio::spawn(async move { client.commit(txn_label).await }).map(|join_result| { + join_result + .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))) + .and_then(|commit_result| commit_result) }) - .collect::>>>(); - futures::future::try_join_all(join_handles) - .await - .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?; + })) + .await?; } Ok(()) } From 6b162ae143958a3000350a6ab281167e9c4e99f1 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 18 Nov 2024 13:50:42 +0800 Subject: [PATCH 2/2] avoid spawn --- src/connector/src/sink/starrocks.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index f650f6fa4af3d..b5b6bf90e0258 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use bytes::Bytes; -use futures::FutureExt; use mysql_async::prelude::Queryable; use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; @@ -898,14 +897,11 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter { tracing::debug!(?epoch, ?txn_labels, "commit transaction"); if !txn_labels.is_empty() { - futures::future::try_join_all(txn_labels.into_iter().map(|txn_label| { - let client = self.client.clone(); - tokio::spawn(async move { client.commit(txn_label).await }).map(|join_result| { - join_result - .map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err))) - .and_then(|commit_result| commit_result) - }) - })) + futures::future::try_join_all( + txn_labels + .into_iter() + .map(|txn_label| self.client.commit(txn_label)), + ) .await?; } Ok(())