Skip to content

Commit

Permalink
use join_all with timeout on error
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Feb 18, 2024
1 parent a40320d commit 3b3287d
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::future::{select, Either};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::ActorId;
Expand Down Expand Up @@ -268,11 +269,49 @@ impl StreamRpcManager {
) -> MetaResult<Vec<RSP>> {
let pool = self.env.stream_client_pool();
let f = &f;
Ok(try_join_all(request.map(|(node, input)| async move {
let client = pool.get(node).await?;
f(client, input).await
}))
.await?)
let iters = request.map(|(node, input)| async move {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});

let stream = FuturesUnordered::from_iter(iters);
pin_mut!(stream);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut results_ok = vec![];
let mut results_err = vec![];
let mut is_err_timeout = false;
loop {
let rx = rx.recv();
pin_mut!(rx);
match select(rx, stream.next()).await {
Either::Left((_, _)) => {
break;
}
Either::Right((None, _)) => {
break;
}
Either::Right((Some(Ok(rsp)), _)) => {
results_ok.push(rsp);
}
Either::Right((Some(Err(err)), _)) => {
results_err.push(err);
if is_err_timeout {
continue;
}
is_err_timeout = true;
let tx = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
let _ = tx.send(());
});
}
}
}
if results_err.is_empty() {
return Ok(results_ok);
}
let merged_error = merge_compute_node_rpc_error("merged RPC Error", results_err);
Err(merged_error)
}

async fn broadcast<RSP, Fut: Future<Output = Result<RSP, RpcError>> + 'static>(
Expand Down Expand Up @@ -392,7 +431,6 @@ impl StreamRpcManager {
}
}

#[allow(dead_code)]
fn merge_compute_node_rpc_error(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, RpcError)>,
Expand Down

0 comments on commit 3b3287d

Please sign in to comment.