diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index a7ea0b0ae1421..31bbc72fce1f6 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -39,7 +39,9 @@ use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::{ + streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse, +}; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; @@ -52,7 +54,7 @@ use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; -use crate::barrier::rpc::BarrierRpcManager; +use crate::barrier::rpc::{BarrierRpcManager, ControlStreamManager}; use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; @@ -191,6 +193,8 @@ pub struct GlobalBarrierManager { rpc_manager: BarrierRpcManager, active_streaming_nodes: ActiveStreamingWorkerNodes, + + control_stream_manager: ControlStreamManager, } /// Controls the concurrent execution of commands. @@ -422,7 +426,8 @@ impl GlobalBarrierManager { env: env.clone(), }; - let rpc_manager = BarrierRpcManager::new(context.clone()); + let rpc_manager = BarrierRpcManager::new(); + let control_stream_manager = ControlStreamManager::new(context.clone()); Self { enable_recovery, @@ -434,6 +439,7 @@ impl GlobalBarrierManager { checkpoint_control, rpc_manager, active_streaming_nodes, + control_stream_manager, } } @@ -603,7 +609,7 @@ impl GlobalBarrierManager { self.state .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.rpc_manager.add_worker(&node).await; + self.control_stream_manager.add_worker(node).await; } } @@ -617,6 +623,23 @@ impl GlobalBarrierManager { .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } } + resp_result = self.control_stream_manager.next_response() => { + match resp_result { + Ok((worker_id, prev_epoch, resp)) => { + let resp: StreamingControlStreamResponse = resp; + match resp.response { + Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => { + self.rpc_manager.barrier_collected(worker_id, prev_epoch, resp); + }, + resp => unreachable!("invalid response: {:?}", resp), + } + + } + Err(e) => { + self.failure_recovery(e, empty()).await; + } + } + } // Barrier completes. completion = self.rpc_manager.next_complete_barrier() => { self.handle_barrier_complete( @@ -673,12 +696,16 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - self.rpc_manager + let node_to_collect = self + .control_stream_manager .inject_barrier(command_ctx.clone()) .inspect_err(|_| { fail_point!("inject_barrier_err_success"); })?; + self.rpc_manager + .barrier_injected(command_ctx.clone(), node_to_collect); + // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); let curr_paused_reason = command_ctx.next_paused_reason(); @@ -713,9 +740,8 @@ impl GlobalBarrierManager { if let Err(err) = result { // FIXME: If it is a connector source error occurred in the init barrier, we should pass // back to frontend - let fail_node = self.checkpoint_control.barrier_failed(); warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); - self.failure_recovery(err, fail_node).await; + self.failure_recovery(err, empty()).await; return; } // change the state to Complete @@ -735,10 +761,7 @@ impl GlobalBarrierManager { } // Handle the error node and the nodes after it if let Some(err) = err_msg { - let fail_nodes = complete_nodes - .drain(index..) - .chain(self.checkpoint_control.barrier_failed().into_iter()) - .collect_vec(); + let fail_nodes = complete_nodes.drain(index..); warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); self.failure_recovery(err, fail_nodes).await; } @@ -750,8 +773,12 @@ impl GlobalBarrierManager { fail_nodes: impl IntoIterator, ) { self.checkpoint_control.clear_changes(); + self.rpc_manager.clear(); - for node in fail_nodes { + for node in fail_nodes + .into_iter() + .chain(self.checkpoint_control.barrier_failed().into_iter()) + { if let Some(timer) = node.timer { timer.observe_duration(); } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 53f13c9f89bab..549ea73f73d62 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -37,7 +37,7 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::rpc::BarrierRpcManager; +use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; @@ -413,9 +413,10 @@ impl GlobalBarrierManager { })? }; - let mut rpc_manager = BarrierRpcManager::new(self.context.clone()); + let mut control_stream_manager = + ControlStreamManager::new(self.context.clone()); - rpc_manager + control_stream_manager .reset(prev_epoch.value().0, active_streaming_nodes.current()) .await .inspect_err(|err| { @@ -470,28 +471,20 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - rpc_manager.inject_barrier(command_ctx.clone())?; - - let res = match rpc_manager.next_complete_barrier().await.result { - Ok(response) => { - if let Err(err) = command_ctx.post_collect().await { - warn!(error = %err.as_report(), "post_collect failed"); - Err(err) - } else { - Ok((new_epoch.clone(), response)) - } - } - Err(err) => { - warn!(error = %err.as_report(), "inject_barrier failed"); - Err(err) + let mut node_to_collect = + control_stream_manager.inject_barrier(command_ctx.clone())?; + loop { + let (worker_id, _, _) = control_stream_manager.next_response().await?; + assert!(node_to_collect.remove(&worker_id)); + if node_to_collect.is_empty() { + break; } - }; - let (new_epoch, _) = res?; + } ( BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), active_streaming_nodes, - rpc_manager, + control_stream_manager, ) }; if recovery_result.is_err() { @@ -507,7 +500,11 @@ impl GlobalBarrierManager { recovery_timer.observe_duration(); self.scheduled_barriers.mark_ready(); - (self.state, self.active_streaming_nodes, self.rpc_manager) = new_state; + ( + self.state, + self.active_streaming_nodes, + self.control_stream_manager, + ) = new_state; tracing::info!( epoch = self.state.in_flight_prev_epoch().value().0, diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 46943ebb98bee..f1e0328df5a36 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::future::Future; use std::sync::Arc; @@ -20,22 +21,25 @@ use std::task::Poll; use anyhow::anyhow; use fail::fail_point; use futures::future::{poll_fn, try_join_all}; -use futures::StreamExt; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; -use risingwave_common::{bail, must_match}; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest, - StreamingControlStreamRequest, UpdateActorsRequest, + StreamingControlStreamRequest, StreamingControlStreamResponse, UpdateActorsRequest, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{StreamClient, StreamingControlHandle}; +use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; -use tracing::{error, warn}; +use tokio::sync::oneshot; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use super::command::CommandContext; @@ -44,6 +48,104 @@ use crate::error::MetaError; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::MetaResult; +struct InflightBarrierNode { + remaining_workers: HashSet, + collected_resps: Vec, + completion_sender: oneshot::Sender, +} + +type BarrierCompletionFuture = impl Future + Send + 'static; + +pub(super) struct BarrierRpcManager { + /// Futures that await on the completion of barrier. + injected_in_progress_barrier: FuturesUnordered, + + inflight_barriers: BTreeMap, +} + +impl BarrierRpcManager { + pub(super) fn new() -> Self { + Self { + injected_in_progress_barrier: FuturesUnordered::new(), + inflight_barriers: BTreeMap::new(), + } + } + + pub(super) fn clear(&mut self) { + self.injected_in_progress_barrier = FuturesUnordered::new(); + self.inflight_barriers.clear(); + } + + pub(super) fn barrier_injected( + &mut self, + command_context: Arc, + node_to_collect: HashSet, + ) { + let prev_epoch = command_context.prev_epoch.value().0; + let (tx, rx) = oneshot::channel(); + let await_complete_future = rx.map(move |result| match result { + Ok(completion) => completion, + Err(_e) => BarrierCompletion { + prev_epoch, + result: Err(anyhow!("failed to receive barrier completion result").into()), + }, + }); + self.injected_in_progress_barrier + .push(await_complete_future); + assert!( + self.inflight_barriers + .insert( + command_context.prev_epoch.value().0, + InflightBarrierNode { + remaining_workers: node_to_collect, + collected_resps: vec![], + completion_sender: tx, + }, + ) + .is_none(), + "should not inject multiple times" + ); + } + + pub(super) fn barrier_collected( + &mut self, + worker_id: WorkerId, + prev_epoch: u64, + resp: BarrierCompleteResponse, + ) { + match self.inflight_barriers.entry(prev_epoch) { + Entry::Vacant(_) => { + warn!( + prev_epoch, + worker_id, "collect non-inflight barrier from worker" + ); + } + Entry::Occupied(mut entry) => { + let node = &mut entry.get_mut(); + assert!(node.remaining_workers.remove(&worker_id)); + node.collected_resps.push(resp); + if node.remaining_workers.is_empty() { + let node = entry.remove(); + if node + .completion_sender + .send(BarrierCompletion { + prev_epoch, + result: Ok(node.collected_resps), + }) + .is_err() + { + debug!(prev_epoch, "failed to notify barrier completion"); + } + } + } + } + } + + pub(super) async fn next_complete_barrier(&mut self) -> BarrierCompletion { + pending_on_none(self.injected_in_progress_barrier.next()).await + } +} + struct ControlStreamNode { worker: WorkerNode, handle: StreamingControlHandle, @@ -51,28 +153,20 @@ struct ControlStreamNode { inflight_barriers: VecDeque>, } -struct InflightBarrierNode { - command_ctx: Arc, - remaining_workers: HashSet, - collected_resps: Vec, -} - -pub(super) struct BarrierRpcManager { +pub(super) struct ControlStreamManager { context: GlobalBarrierManagerContext, nodes: HashMap, - inflight_barriers: BTreeMap, } -impl BarrierRpcManager { +impl ControlStreamManager { pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { Self { context, nodes: Default::default(), - inflight_barriers: Default::default(), } } - pub(super) async fn add_worker(&mut self, node: &WorkerNode) { + pub(super) async fn add_worker(&mut self, node: WorkerNode) { if self.nodes.contains_key(&node.id) { warn!(id = node.id, host = ?node.host, "node already exists"); return; @@ -82,12 +176,15 @@ impl BarrierRpcManager { .hummock_manager .latest_snapshot() .committed_epoch; + let node_id = node.id; + let node_host = node.host.clone().unwrap(); match self.context.new_control_stream_node(node, prev_epoch).await { Ok(stream_node) => { - let _ = self.nodes.insert(node.id, stream_node); + let _ = self.nodes.insert(node_id, stream_node); + info!(?node_host, "add control stream worker"); } Err(e) => { - error!(err = %e.as_report(), node = ?node.host, "fail to start control stream with worker node"); + error!(err = %e.as_report(), ?node_host, "fail to start control stream with worker node"); } } } @@ -100,80 +197,75 @@ impl BarrierRpcManager { self.nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async { let node = self .context - .new_control_stream_node(node, prev_epoch) + .new_control_stream_node(node.clone(), prev_epoch) .await?; Result::<_, MetaError>::Ok((*worker_id, node)) })) .await? .into_iter() .collect(); - self.inflight_barriers.clear(); Ok(()) } - pub(super) fn next_complete_barrier(&mut self) -> impl Future + '_ { + pub(super) fn next_response( + &mut self, + ) -> impl Future> + '_ + { poll_fn(|cx| { - if let Some(mut entry) = self.inflight_barriers.first_entry() { - let barrier_node = entry.get_mut(); - let mut collected = HashSet::new(); - for worker_id in &barrier_node.remaining_workers { - let node = self.nodes.get_mut(worker_id).expect("must exist"); - assert_eq!( - barrier_node.command_ctx.curr_epoch.value(), - node.inflight_barriers - .front() - .expect("must non-empty") - .curr_epoch - .value() - ); - if let Poll::Ready(poll_result) = - node.handle.response_stream.poll_next_unpin(cx) - { - let command_ctx = node.inflight_barriers.pop_front().expect("non-empty"); - let err = match poll_result { - None => { - anyhow!("end of response stream. node: {:?}", node.worker).into() - } - Some(Err(e)) => e.into(), - Some(Ok(resp)) => { - let resp = must_match!(resp.response, Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => resp); - - collected.insert(*worker_id); - barrier_node.collected_resps.push(resp); - - continue; + for (worker_id, node) in &mut self.nodes { + match node.handle.response_stream.poll_next_unpin(cx) { + Poll::Ready(result) => { + match result + .ok_or_else(|| anyhow!("end of response stream").into()) + .and_then(|result| result.map_err(Into::::into)) + { + Ok(resp) => match &resp.response { + Some( + streaming_control_stream_response::Response::CompleteBarrier(_), + ) => { + let command = node + .inflight_barriers + .pop_front() + .expect("should exist when get collect resp"); + return Poll::Ready(Ok(( + *worker_id, + command.prev_epoch.value().0, + resp, + ))); + } + resp => { + return Poll::Ready(Err(anyhow!( + "get unexpected resp: {:?}", + resp + ) + .into())); + } + }, + Err(err) => { + if let Some(command) = node.inflight_barriers.pop_front() { + warn!(node = ?node.worker, err = ?err.as_report(), "get error from response stream"); + self.context.report_collect_failure(&command, &err); + return Poll::Ready(Err(err)); + } } - }; - self.context.report_collect_failure(&command_ctx, &err); - return Poll::Ready(BarrierCompletion { - prev_epoch: command_ctx.prev_epoch.value().0, - result: Err(err), - }); - } else { + } + } + Poll::Pending => { continue; } } - barrier_node.remaining_workers = &barrier_node.remaining_workers - &collected; - if barrier_node.remaining_workers.is_empty() { - let barrier_node = entry.remove(); - Poll::Ready(BarrierCompletion { - prev_epoch: barrier_node.command_ctx.prev_epoch.value().0, - result: Ok(barrier_node.collected_resps), - }) - } else { - Poll::Pending - } - } else { - Poll::Pending } + Poll::Pending }) } +} +impl ControlStreamManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. pub(super) fn inject_barrier( &mut self, command_context: Arc, - ) -> MetaResult<()> { + ) -> MetaResult> { fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err")); let mutation = command_context.to_mutation(); let info = command_context.info.clone(); @@ -239,37 +331,20 @@ impl BarrierRpcManager { .event_log_manager_ref() .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); })?; - if let Some((_, node)) = self.inflight_barriers.last_key_value() { - assert_eq!( - node.command_ctx.curr_epoch.value(), - command_context.prev_epoch.value() - ); - } - assert!(self - .inflight_barriers - .insert( - command_context.prev_epoch.value().0, - InflightBarrierNode { - command_ctx: command_context, - remaining_workers: node_need_collect, - collected_resps: vec![], - }, - ) - .is_none()); - Ok(()) + Ok(node_need_collect) } } impl GlobalBarrierManagerContext { async fn new_control_stream_node( &self, - node: &WorkerNode, + node: WorkerNode, prev_epoch: u64, ) -> MetaResult { let handle = self .env .stream_client_pool() - .get(node) + .get(&node) .await? .start_streaming_control(prev_epoch) .await?;