Skip to content

Commit

Permalink
fix(starrocks): fix starrocks coordinator incorrect try_join_all on j…
Browse files Browse the repository at this point in the history
…oin handle (#19412)
  • Loading branch information
wenym1 authored Nov 18, 2024
1 parent 084fff8 commit 88f9efa
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use serde_derive::Serialize;
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use url::form_urlencoded;
use with_options::WithOptions;

Expand Down Expand Up @@ -898,16 +897,12 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter {
tracing::debug!(?epoch, ?txn_labels, "commit transaction");

if !txn_labels.is_empty() {
let join_handles = txn_labels
.into_iter()
.map(|txn_label| {
let client = self.client.clone();
tokio::spawn(async move { client.commit(txn_label).await })
})
.collect::<Vec<JoinHandle<Result<String>>>>();
futures::future::try_join_all(join_handles)
.await
.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?;
futures::future::try_join_all(
txn_labels
.into_iter()
.map(|txn_label| self.client.commit(txn_label)),
)
.await?;
}
Ok(())
}
Expand Down

0 comments on commit 88f9efa

Please sign in to comment.