From 3b3287d6ff9e1cad504155a926689f84caf3f83e Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 17 Feb 2024 11:54:57 +0800 Subject: [PATCH] use join_all with timeout on error --- src/meta/src/barrier/rpc.rs | 54 +++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index db4910a9a449e..28e1183f51e16 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,12 +15,13 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::future::try_join_all; +use futures::future::{select, Either}; use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; @@ -268,11 +269,49 @@ impl StreamRpcManager { ) -> MetaResult> { let pool = self.env.stream_client_pool(); let f = &f; - Ok(try_join_all(request.map(|(node, input)| async move { - let client = pool.get(node).await?; - f(client, input).await - })) - .await?) + let iters = request.map(|(node, input)| async move { + let client = pool.get(node).await.map_err(|e| (node.id, e))?; + f(client, input).await.map_err(|e| (node.id, e)) + }); + + let stream = FuturesUnordered::from_iter(iters); + pin_mut!(stream); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut results_ok = vec![]; + let mut results_err = vec![]; + let mut is_err_timeout = false; + loop { + let rx = rx.recv(); + pin_mut!(rx); + match select(rx, stream.next()).await { + Either::Left((_, _)) => { + break; + } + Either::Right((None, _)) => { + break; + } + Either::Right((Some(Ok(rsp)), _)) => { + results_ok.push(rsp); + } + Either::Right((Some(Err(err)), _)) => { + results_err.push(err); + if is_err_timeout { + continue; + } + is_err_timeout = true; + let tx = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + let _ = tx.send(()); + }); + } + } + } + if results_err.is_empty() { + return Ok(results_ok); + } + let merged_error = merge_compute_node_rpc_error("merged RPC Error", results_err); + Err(merged_error) } async fn broadcast> + 'static>( @@ -392,7 +431,6 @@ impl StreamRpcManager { } } -#[allow(dead_code)] fn merge_compute_node_rpc_error( message: &str, errors: impl IntoIterator,