From 24a68514ba039535e1afbfa21d2becbff0731462 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 8 Oct 2024 12:25:27 +0800 Subject: [PATCH] fix(meta): avoid removing control stream node when failed to send request (#18767) --- src/meta/src/barrier/mod.rs | 8 +- src/meta/src/barrier/rpc.rs | 195 +++++++++++++++--------------------- 2 files changed, 88 insertions(+), 115 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 463de3f6febe4..10f9172949c47 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -879,9 +879,11 @@ impl GlobalBarrierManager { assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); self.scheduled_barriers.force_checkpoint_in_next_barrier(); } - self.control_stream_manager.remove_partial_graph( - output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() - ); + if !output.table_ids_to_finish.is_empty() { + self.control_stream_manager.remove_partial_graph( + output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() + ); + } } Ok(None) => {} Err(e) => { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 0177259486b72..a86f458d25ab3 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -14,12 +14,13 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; +use std::future::poll_fn; +use std::task::Poll; use std::time::Duration; use anyhow::anyhow; use fail::fail_point; use futures::future::try_join_all; -use futures::stream::{BoxStream, FuturesUnordered}; use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -31,11 +32,11 @@ use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, Subscrip use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, - InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, + InjectBarrierRequest, StreamingControlStreamRequest, }; +use risingwave_rpc_client::StreamingControlHandle; use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; -use tokio::sync::mpsc::UnboundedSender; use tokio::time::{sleep, timeout}; use tokio_retry::strategy::ExponentialBackoff; use tracing::{error, info, warn}; @@ -51,55 +52,12 @@ const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); struct ControlStreamNode { worker: WorkerNode, - sender: UnboundedSender, + handle: StreamingControlHandle, } -mod response_stream_future { - use std::future::Future; - - use anyhow::anyhow; - use futures::stream::BoxStream; - use futures::{FutureExt, StreamExt}; - use risingwave_pb::stream_service::StreamingControlStreamResponse; - - use crate::manager::WorkerId; - use crate::MetaResult; - - pub(super) fn into_future( - worker_id: WorkerId, - stream: BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, - ) -> ResponseStreamFuture { - stream.into_future().map(move |(opt, stream)| { - ( - worker_id, - stream, - opt.ok_or_else(|| anyhow!("end of stream").into()) - .and_then(|result| result.map_err(|e| e.into())), - ) - }) - } - - pub(super) type ResponseStreamFuture = impl Future< - Output = ( - WorkerId, - BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, - MetaResult, - ), - > + 'static; -} - -use response_stream_future::*; - pub(super) struct ControlStreamManager { context: GlobalBarrierManagerContext, nodes: HashMap, - response_streams: FuturesUnordered, } impl ControlStreamManager { @@ -107,7 +65,6 @@ impl ControlStreamManager { Self { context, nodes: Default::default(), - response_streams: FuturesUnordered::new(), } } @@ -141,10 +98,8 @@ impl ControlStreamManager { ) .await { - Ok((stream_node, response_stream)) => { - let _ = self.nodes.insert(node_id, stream_node); - self.response_streams - .push(into_future(node_id, response_stream)); + Ok(stream_node) => { + assert!(self.nodes.insert(node_id, stream_node).is_none()); info!(?node_host, "add control stream worker"); return; } @@ -179,11 +134,8 @@ impl ControlStreamManager { })) .await?; self.nodes.clear(); - self.response_streams.clear(); - for (worker_id, (node, response_stream)) in nodes { - self.nodes.insert(worker_id, node); - self.response_streams - .push(into_future(worker_id, response_stream)); + for (worker_id, node) in nodes { + assert!(self.nodes.insert(worker_id, node).is_none()); } Ok(()) @@ -196,17 +148,56 @@ impl ControlStreamManager { async fn next_response( &mut self, - ) -> Option<(WorkerId, MetaResult)> { - let (worker_id, response_stream, result) = self.response_streams.next().await?; - - match result.as_ref().map(|r| r.response.as_ref().unwrap()) { - Ok(streaming_control_stream_response::Response::Shutdown(_)) | Err(_) => { - // Do not add it back to the `response_streams` so that it will not be polled again. - } - _ => { - self.response_streams - .push(into_future(worker_id, response_stream)); + ) -> Option<( + WorkerId, + MetaResult, + )> { + if self.nodes.is_empty() { + return None; + } + let (worker_id, result) = poll_fn(|cx| { + for (worker_id, node) in &mut self.nodes { + match node.handle.response_stream.poll_next_unpin(cx) { + Poll::Ready(result) => { + return Poll::Ready(( + *worker_id, + result + .ok_or_else(|| anyhow!("end of stream").into()) + .and_then(|result| { + result.map_err(Into::::into).and_then(|resp| { + match resp + .response + .ok_or_else(||anyhow!("empty response"))? + { + streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!( + "worker node {worker_id} is shutting down" + ) + .into()), + streaming_control_stream_response::Response::Init(_) => { + // This arm should be unreachable. + Err(anyhow!("get unexpected init response").into()) + } + resp => Ok(resp), + } + }) + }), + )); + } + Poll::Pending => { + continue; + } + } } + Poll::Pending + }) + .await; + + if let Err(err) = &result { + let node = self + .nodes + .remove(&worker_id) + .expect("should exist when get shutdown resp"); + warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } Some((worker_id, result)) @@ -219,30 +210,16 @@ impl ControlStreamManager { { let (worker_id, result) = pending_on_none(self.next_response()).await; - let result = match result { - Ok(resp) => match resp.response.unwrap() { - Response::CompleteBarrier(resp) => { - assert_eq!(worker_id, resp.worker_id); - Ok(resp) - } - Response::Shutdown(_) => { - Err(anyhow!("worker node {worker_id} is shutting down").into()) - } - Response::Init(_) => { - // This arm should be unreachable. - Err(anyhow!("get unexpected init response").into()) + + ( + worker_id, + result.map(|resp| match resp { + Response::CompleteBarrier(resp) => resp, + Response::Shutdown(_) | Response::Init(_) => { + unreachable!("should be treated as error") } - }, - Err(err) => Err(err), - }; - if let Err(err) = &result { - let node = self - .nodes - .remove(&worker_id) - .expect("should exist when get shutdown resp"); - warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); - } - (worker_id, result) + }), + ) } } @@ -356,7 +333,7 @@ impl ControlStreamManager { .collect_vec(); self.nodes - .iter_mut() + .iter() .try_for_each(|(node_id, node)| { let actor_ids_to_collect: Vec<_> = pre_applied_graph_info .actor_ids_to_collect(*node_id) @@ -383,7 +360,8 @@ impl ControlStreamManager { passed_actors: vec![], }; - node.sender + node.handle + .request_sender .send(StreamingControlStreamRequest { request: Some( streaming_control_stream_request::Request::InjectBarrier( @@ -436,22 +414,21 @@ impl ControlStreamManager { } pub(super) fn remove_partial_graph(&mut self, partial_graph_ids: Vec) { - self.nodes.retain(|_, node| { - if node - .sender + self.nodes.iter().for_each(|(_, node)| { + if node.handle + .request_sender .send(StreamingControlStreamRequest { request: Some( streaming_control_stream_request::Request::RemovePartialGraph( - RemovePartialGraphRequest { partial_graph_ids: partial_graph_ids.clone() }, + RemovePartialGraphRequest { + partial_graph_ids: partial_graph_ids.clone(), + }, ), ), }) - .is_ok() + .is_err() { - true - } else { - warn!(id = node.worker.id, host = ?node.worker.host, ?partial_graph_ids, "fail to remove_partial_graph request, node removed"); - false + warn!(worker_id = node.worker.id,node = ?node.worker.host,"failed to send remove partial graph request"); } }) } @@ -463,10 +440,7 @@ impl GlobalBarrierManagerContext { node: WorkerNode, initial_version_id: HummockVersionId, mv_depended_subscriptions: &HashMap>, - ) -> MetaResult<( - ControlStreamNode, - BoxStream<'static, risingwave_rpc_client::error::Result>, - )> { + ) -> MetaResult { let handle = self .env .stream_client_pool() @@ -474,13 +448,10 @@ impl GlobalBarrierManagerContext { .await? .start_streaming_control(initial_version_id, mv_depended_subscriptions) .await?; - Ok(( - ControlStreamNode { - worker: node.clone(), - sender: handle.request_sender, - }, - handle.response_stream, - )) + Ok(ControlStreamNode { + worker: node.clone(), + handle, + }) } /// Send barrier-complete-rpc and wait for responses from all CNs