diff --git a/proto/stream_service.proto b/proto/stream_service.proto index e8c5d94a20ac3..5990fe1e2cbcf 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -44,16 +44,6 @@ message DropActorsResponse { common.Status status = 2; } -message ForceStopActorsRequest { - string request_id = 1; - uint64 prev_epoch = 2; -} - -message ForceStopActorsResponse { - string request_id = 1; - common.Status status = 2; -} - message InjectBarrierRequest { string request_id = 1; stream_plan.Barrier barrier = 2; @@ -61,16 +51,6 @@ message InjectBarrierRequest { repeated uint32 actor_ids_to_collect = 4; } -message InjectBarrierResponse { - string request_id = 1; - common.Status status = 2; -} - -message BarrierCompleteRequest { - string request_id = 1; - uint64 prev_epoch = 2; - map tracing_context = 3; -} message BarrierCompleteResponse { message CreateMviewProgress { uint32 backfill_actor_id = 1; @@ -104,15 +84,33 @@ message WaitEpochCommitResponse { common.Status status = 1; } +message StreamingControlStreamRequest { + message InitRequest { + uint64 prev_epoch = 2; + } + + oneof request { + InitRequest init = 1; + InjectBarrierRequest inject_barrier = 2; + } +} + +message StreamingControlStreamResponse { + message InitResponse {} + + oneof response { + InitResponse init = 1; + BarrierCompleteResponse complete_barrier = 2; + } +} + service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse); rpc DropActors(DropActorsRequest) returns (DropActorsResponse); - rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse); - rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); - rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); + rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 6e96406743f29..18b77ff1804bc 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -13,18 +13,16 @@ // limitations under the License. use await_tree::InstrumentAwait; -use itertools::Itertools; -use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::LocalSstableInfo; -use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; +use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; -use risingwave_stream::executor::Barrier; -use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; +use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; -use tonic::{Request, Response, Status}; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::{Request, Response, Status, Streaming}; #[derive(Clone)] pub struct StreamServiceImpl { @@ -40,6 +38,9 @@ impl StreamServiceImpl { #[async_trait::async_trait] impl StreamService for StreamServiceImpl { + type StreamingControlStreamStream = + impl Stream>; + #[cfg_attr(coverage, coverage(off))] async fn update_actors( &self, @@ -110,86 +111,6 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, coverage(off))] - async fn force_stop_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - self.mgr.reset(req.prev_epoch).await; - Ok(Response::new(ForceStopActorsResponse { - request_id: req.request_id, - status: None, - })) - } - - #[cfg_attr(coverage, coverage(off))] - async fn inject_barrier( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let barrier = - Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; - - self.mgr - .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) - .await?; - - Ok(Response::new(InjectBarrierResponse { - request_id: req.request_id, - status: None, - })) - } - - #[cfg_attr(coverage, coverage(off))] - async fn barrier_complete( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let BarrierCompleteResult { - create_mview_progress, - sync_result, - } = self - .mgr - .collect_barrier(req.prev_epoch) - .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) - .await - .inspect_err( - |err| tracing::error!(error = %err.as_report(), "failed to collect barrier"), - )?; - - let (synced_sstables, table_watermarks) = sync_result - .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) - .unwrap_or_default(); - - Ok(Response::new(BarrierCompleteResponse { - request_id: req.request_id, - status: None, - create_mview_progress, - synced_sstables: synced_sstables - .into_iter() - .map( - |LocalSstableInfo { - compaction_group_id, - sst_info, - table_stats, - }| GroupedSstableInfo { - compaction_group_id, - sst: Some(sst_info), - table_stats_map: to_prost_table_stats_map(table_stats), - }, - ) - .collect_vec(), - worker_id: self.env.worker_id(), - table_watermarks: table_watermarks - .into_iter() - .map(|(key, value)| (key.table_id, value.to_protobuf())) - .collect(), - })) - } - #[cfg_attr(coverage, coverage(off))] async fn wait_epoch_commit( &self, @@ -210,4 +131,24 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } + + async fn streaming_control_stream( + &self, + request: Request>, + ) -> Result, Status> { + let mut stream = request.into_inner().boxed(); + let first_request = stream.try_next().await?; + let Some(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::Init(init_request)), + }) = first_request + else { + return Err(Status::invalid_argument(format!( + "unexpected first request: {:?}", + first_request + ))); + }; + let (tx, rx) = unbounded_channel(); + self.mgr.handle_new_control_stream(tx, stream, init_request); + Ok(Response::new(UnboundedReceiverStream::new(rx))) + } } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 006de45d522e7..22311a2b43911 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -175,6 +175,8 @@ pub enum Command { RescheduleFragment { reschedules: HashMap, table_parallelism: HashMap, + // should contain the actor ids in upstream and downstream fragment of `reschedules` + fragment_actors: HashMap>, }, /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is @@ -351,7 +353,7 @@ impl CommandContext { impl CommandContext { /// Generate a mutation for the given command. - pub async fn to_mutation(&self) -> MetaResult> { + pub fn to_mutation(&self) -> Option { let mutation = match &self.command { Command::Plain(mutation) => mutation.clone(), @@ -479,21 +481,23 @@ impl CommandContext { init_split_assignment, ), - Command::RescheduleFragment { reschedules, .. } => { - let metadata_manager = &self.barrier_manager_context.metadata_manager; - + Command::RescheduleFragment { + reschedules, + fragment_actors, + .. + } => { let mut dispatcher_update = HashMap::new(); for reschedule in reschedules.values() { for &(upstream_fragment_id, dispatcher_id) in &reschedule.upstream_fragment_dispatcher_ids { // Find the actors of the upstream fragment. - let upstream_actor_ids = metadata_manager - .get_running_actors_of_fragment(upstream_fragment_id) - .await?; + let upstream_actor_ids = fragment_actors + .get(&upstream_fragment_id) + .expect("should contain"); // Record updates for all actors. - for actor_id in upstream_actor_ids { + for &actor_id in upstream_actor_ids { // Index with the dispatcher id to check duplicates. dispatcher_update .try_insert( @@ -526,9 +530,9 @@ impl CommandContext { for (&fragment_id, reschedule) in reschedules { for &downstream_fragment_id in &reschedule.downstream_fragment_ids { // Find the actors of the downstream fragment. - let downstream_actor_ids = metadata_manager - .get_running_actors_of_fragment(downstream_fragment_id) - .await?; + let downstream_actor_ids = fragment_actors + .get(&downstream_fragment_id) + .expect("should contain"); // Downstream removed actors should be skipped // Newly created actors of the current fragment will not dispatch Update @@ -545,7 +549,7 @@ impl CommandContext { .unwrap_or_default(); // Record updates for all actors. - for actor_id in downstream_actor_ids { + for &actor_id in downstream_actor_ids { if downstream_removed_actors.contains(&actor_id) { continue; } @@ -620,7 +624,7 @@ impl CommandContext { } }; - Ok(mutation) + mutation } fn generate_update_mutation_for_replace_table( @@ -962,6 +966,7 @@ impl CommandContext { Command::RescheduleFragment { reschedules, table_parallelism, + .. } => { let removed_actors = reschedules .values() diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 4d867d266270b..652a4b51d9264 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -25,11 +25,11 @@ use arc_swap::ArcSwap; use fail::fail_point; use itertools::Itertools; use prometheus::HistogramTimer; +use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; -use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; @@ -41,7 +41,9 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::{ + streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse, +}; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; @@ -54,12 +56,13 @@ use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::rpc::BarrierRpcManager; +use crate::barrier::rpc::ControlStreamManager; use crate::barrier::state::BarrierManagerState; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ - ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId, + ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, + MetadataManager, WorkerId, }; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; @@ -188,9 +191,9 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, - rpc_manager: BarrierRpcManager, - active_streaming_nodes: ActiveStreamingWorkerNodes, + + control_stream_manager: ControlStreamManager, } /// Controls the concurrent execution of commands. @@ -228,7 +231,7 @@ impl CheckpointControl { self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue .values() - .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) + .filter(|x| x.state.is_inflight()) .count() as i64, ); self.context @@ -238,7 +241,12 @@ impl CheckpointControl { } /// Enqueue a barrier command, and init its state to `InFlight`. - fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { + fn enqueue_command( + &mut self, + command_ctx: Arc, + notifiers: Vec, + node_to_collect: HashSet, + ) { let timer = self.context.metrics.barrier_latency.start_timer(); if let Some((_, node)) = self.command_ctx_queue.last_key_value() { @@ -251,7 +259,10 @@ impl CheckpointControl { command_ctx.prev_epoch.value().0, EpochNode { enqueue_time: timer, - state: BarrierEpochState::InFlight, + state: BarrierEpochState { + node_to_collect, + resps: vec![], + }, command_ctx, notifiers, }, @@ -260,14 +271,19 @@ impl CheckpointControl { /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_collected(&mut self, prev_epoch: u64, result: Vec) { + fn barrier_collected( + &mut self, + worker_id: WorkerId, + prev_epoch: u64, + resp: BarrierCompleteResponse, + ) { if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { - assert!(matches!(node.state, BarrierEpochState::InFlight)); - node.state = BarrierEpochState::Collected(result); + assert!(node.state.node_to_collect.remove(&worker_id)); + node.state.resps.push(resp); } else { panic!( - "received barrier complete response for an unknown epoch: {}", - prev_epoch + "collect barrier on non-existing barrier: {}, {}", + prev_epoch, worker_id ); } } @@ -277,7 +293,7 @@ impl CheckpointControl { let in_flight_not_full = self .command_ctx_queue .values() - .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) + .filter(|x| x.state.is_inflight()) .count() < in_flight_barrier_nums; @@ -340,13 +356,8 @@ impl CheckpointControl { }; if !is_err { // continue to finish the pending collected barrier. - while let Some(( - _, - EpochNode { - state: BarrierEpochState::Collected(_), - .. - }, - )) = self.command_ctx_queue.first_key_value() + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() { let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); @@ -390,12 +401,16 @@ pub struct EpochNode { } /// The state of barrier. -enum BarrierEpochState { - /// This barrier is current in-flight on the stream graph of compute nodes. - InFlight, +struct BarrierEpochState { + node_to_collect: HashSet, - /// This barrier is collected. - Collected(Vec), + resps: Vec, +} + +impl BarrierEpochState { + fn is_inflight(&self) -> bool { + !self.node_to_collect.is_empty() + } } enum CompletingCommand { @@ -411,13 +426,6 @@ enum CompletingCommand { Err(MetaError), } -/// The result of barrier collect. -#[derive(Debug)] -struct BarrierCollectResult { - prev_epoch: u64, - result: MetaResult>, -} - impl GlobalBarrierManager { /// Create a new [`crate::barrier::GlobalBarrierManager`]. #[allow(clippy::too_many_arguments)] @@ -458,10 +466,9 @@ impl GlobalBarrierManager { env: env.clone(), }; + let control_stream_manager = ControlStreamManager::new(context.clone()); let checkpoint_control = CheckpointControl::new(context.clone()); - let rpc_manager = BarrierRpcManager::new(context.clone()); - Self { enable_recovery, scheduled_barriers, @@ -470,8 +477,8 @@ impl GlobalBarrierManager { env, state: initial_invalid_state, checkpoint_control, - rpc_manager, active_streaming_nodes, + control_stream_manager, } } @@ -489,7 +496,7 @@ impl GlobalBarrierManager { } /// Check whether we should pause on bootstrap from the system parameter and reset it. - async fn take_pause_on_bootstrap(&self) -> MetaResult { + async fn take_pause_on_bootstrap(&mut self) -> MetaResult { let paused = self .env .system_params_reader() @@ -640,6 +647,9 @@ impl GlobalBarrierManager { self.state .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); + if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { + self.control_stream_manager.add_worker(node).await; + } } // Checkpoint frequency changes. @@ -652,14 +662,19 @@ impl GlobalBarrierManager { .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } } - // Barrier completes. - collect_result = self.rpc_manager.next_collected_barrier() => { - match collect_result.result { - Ok(resps) => { - self.checkpoint_control.barrier_collected(collect_result.prev_epoch, resps); - }, + resp_result = self.control_stream_manager.next_response() => { + match resp_result { + Ok((worker_id, prev_epoch, resp)) => { + let resp: StreamingControlStreamResponse = resp; + match resp.response { + Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => { + self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp); + }, + resp => unreachable!("invalid response: {:?}", resp), + } + + } Err(e) => { - fail_point!("inject_barrier_err_success"); self.failure_recovery(e).await; } } @@ -683,7 +698,9 @@ impl GlobalBarrierManager { if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { - self.handle_new_barrier(scheduled); + if let Err(e) = self.handle_new_barrier(scheduled) { + self.failure_recovery(e).await; + } } } self.checkpoint_control.update_barrier_nums_metrics(); @@ -691,7 +708,7 @@ impl GlobalBarrierManager { } /// Handle the new barrier from the scheduled queue and inject it. - fn handle_new_barrier(&mut self, scheduled: Scheduled) { + fn handle_new_barrier(&mut self, scheduled: Scheduled) -> MetaResult<()> { let Scheduled { command, mut notifiers, @@ -728,7 +745,12 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - self.rpc_manager.inject_barrier(command_ctx.clone()); + let node_to_collect = self + .control_stream_manager + .inject_barrier(command_ctx.clone()) + .inspect_err(|_| { + fail_point!("inject_barrier_err_success"); + })?; // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -746,12 +768,12 @@ impl GlobalBarrierManager { self.state.set_paused_reason(curr_paused_reason); // Record the in-flight barrier. self.checkpoint_control - .enqueue_command(command_ctx.clone(), notifiers); + .enqueue_command(command_ctx.clone(), notifiers, node_to_collect); + Ok(()) } async fn failure_recovery(&mut self, err: MetaError) { self.context.tracker.lock().await.abort_all(&err); - self.rpc_manager.clear(); self.checkpoint_control.clear_on_err(&err).await; if self.enable_recovery { @@ -787,7 +809,8 @@ impl GlobalBarrierManagerContext { state, .. } = node; - let resps = must_match!(state, BarrierEpochState::Collected(resps) => resps); + assert!(state.node_to_collect.is_empty()); + let resps = state.resps; let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { @@ -954,13 +977,8 @@ impl CheckpointControl { if matches!(&self.completing_command, CompletingCommand::None) { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let Some(( - _, - EpochNode { - state: BarrierEpochState::Collected(_), - .. - }, - )) = self.command_ctx_queue.first_key_value() + if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() { let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index cd71f9eea707e..a7ea3ae51665a 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -28,6 +29,9 @@ use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; +use risingwave_pb::stream_service::{ + streaming_control_stream_response, StreamingControlStreamResponse, +}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -38,6 +42,8 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; +use crate::barrier::rpc::ControlStreamManager; +use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; @@ -302,15 +308,16 @@ impl GlobalBarrierManagerContext { Ok(()) } -} -impl GlobalBarrierManager { /// Pre buffered drop and cancel command, return true if any. - async fn pre_apply_drop_cancel(&self) -> MetaResult { - let (dropped_actors, cancelled) = self.scheduled_barriers.pre_apply_drop_cancel_scheduled(); + async fn pre_apply_drop_cancel( + &self, + scheduled_barriers: &ScheduledBarriers, + ) -> MetaResult { + let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.context.metadata_manager { + match &self.metadata_manager { MetadataManager::V1(mgr) => { let unregister_table_ids = mgr .fragment_manager @@ -334,7 +341,9 @@ impl GlobalBarrierManager { } Ok(applied) } +} +impl GlobalBarrierManager { /// Recovery the whole cluster from the latest epoch. /// /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be @@ -375,11 +384,14 @@ impl GlobalBarrierManager { // get recovered. let recovery_timer = self.context.metrics.recovery_latency.start_timer(); - let (state, active_streaming_nodes) = tokio_retry::Retry::spawn(retry_strategy, || { + let new_state = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.pre_apply_drop_cancel().await?; + let _ = self + .context + .pre_apply_drop_cancel(&self.scheduled_barriers) + .await?; let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( self.context.metadata_manager.clone(), @@ -427,14 +439,21 @@ impl GlobalBarrierManager { })? }; - // Reset all compute nodes, stop and drop existing actors. - self.reset_compute_nodes(&info, prev_epoch.value().0) + let mut control_stream_manager = + ControlStreamManager::new(self.context.clone()); + + control_stream_manager + .reset(prev_epoch.value().0, active_streaming_nodes.current()) .await .inspect_err(|err| { warn!(error = %err.as_report(), "reset compute nodes failed"); })?; - if self.pre_apply_drop_cancel().await? { + if self + .context + .pre_apply_drop_cancel(&self.scheduled_barriers) + .await? + { info = self .context .resolve_actor_info(all_nodes.clone()) @@ -445,10 +464,10 @@ impl GlobalBarrierManager { } // update and build all actors. - self.update_actors(&info).await.inspect_err(|err| { + self.context.update_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); })?; - self.build_actors(&info).await.inspect_err(|err| { + self.context.build_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "build_actors failed"); })?; @@ -478,30 +497,25 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let res = match self - .context - .inject_barrier(command_ctx.clone(), None, None) - .await - .result - { - Ok(response) => { - if let Err(err) = command_ctx.post_collect().await { - warn!(error = %err.as_report(), "post_collect failed"); - Err(err) - } else { - Ok((new_epoch.clone(), response)) + let mut node_to_collect = + control_stream_manager.inject_barrier(command_ctx.clone())?; + while !node_to_collect.is_empty() { + let (worker_id, _, resp) = control_stream_manager.next_response().await?; + assert_matches!( + resp, + StreamingControlStreamResponse { + response: Some( + streaming_control_stream_response::Response::CompleteBarrier(_) + ) } - } - Err(err) => { - warn!(error = %err.as_report(), "inject_barrier failed"); - Err(err) - } - }; - let (new_epoch, _) = res?; + ); + assert!(node_to_collect.remove(&worker_id)); + } ( BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), active_streaming_nodes, + control_stream_manager, ) }; if recovery_result.is_err() { @@ -517,14 +531,17 @@ impl GlobalBarrierManager { recovery_timer.observe_duration(); self.scheduled_barriers.mark_ready(); + ( + self.state, + self.active_streaming_nodes, + self.control_stream_manager, + ) = new_state; + tracing::info!( - epoch = state.in_flight_prev_epoch().value().0, - paused = ?state.paused_reason(), + epoch = self.state.in_flight_prev_epoch().value().0, + paused = ?self.state.paused_reason(), "recovery success" ); - - self.state = state; - self.active_streaming_nodes = active_streaming_nodes; } } @@ -1013,9 +1030,7 @@ impl GlobalBarrierManagerContext { new_plan.insert(self.env.meta_store_checked()).await?; Ok(new_plan) } -} -impl GlobalBarrierManager { /// Update all actors in compute nodes. async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { @@ -1041,7 +1056,7 @@ impl GlobalBarrierManager { .flatten_ok() .try_collect()?; - let mut all_node_actors = self.context.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { @@ -1055,8 +1070,7 @@ impl GlobalBarrierManager { return Err(anyhow!("actors dropped during update").into()); } - self.context - .stream_rpc_manager + self.stream_rpc_manager .broadcast_update_actor_info( &info.node_map, info.actor_map.keys().cloned(), @@ -1080,8 +1094,7 @@ impl GlobalBarrierManager { return Ok(()); } - self.context - .stream_rpc_manager + self.stream_rpc_manager .build_actors( &info.node_map, info.actor_map.iter().map(|(node_id, actors)| { @@ -1093,23 +1106,6 @@ impl GlobalBarrierManager { Ok(()) } - - /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes( - &self, - info: &InflightActorInfo, - prev_epoch: u64, - ) -> MetaResult<()> { - debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors"); - self.context - .stream_rpc_manager - .force_stop_actors(info.node_map.values(), prev_epoch) - .await?; - - debug!(prev_epoch, "all compute nodes have been reset."); - - Ok(()) - } } #[cfg(test)] diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index dfe9ada44a47e..f8c46ea795b70 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::Future; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::stream::FuturesUnordered; +use futures::future::try_join_all; +use futures::stream::{BoxStream, FuturesUnordered}; use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; @@ -28,141 +29,210 @@ use risingwave_common::util::tracing::TracingContext; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; use risingwave_pb::stream_service::{ - BarrierCompleteRequest, BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, - ForceStopActorsRequest, InjectBarrierRequest, UpdateActorsRequest, + streaming_control_stream_request, streaming_control_stream_response, + BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest, + StreamingControlStreamRequest, StreamingControlStreamResponse, UpdateActorsRequest, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; -use tokio::sync::oneshot; -use tokio::time::timeout; -use tracing::Instrument; +use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::{sleep, timeout}; +use tokio_retry::strategy::ExponentialBackoff; +use tracing::{error, info, warn}; use uuid::Uuid; use super::command::CommandContext; -use super::{BarrierCollectResult, GlobalBarrierManagerContext}; +use super::GlobalBarrierManagerContext; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::{MetaError, MetaResult}; -pub(super) struct BarrierRpcManager { - context: GlobalBarrierManagerContext, +struct ControlStreamNode { + worker: WorkerNode, + sender: UnboundedSender, + // earlier epoch at the front + inflight_barriers: VecDeque>, +} + +fn into_future( + worker_id: WorkerId, + stream: BoxStream< + 'static, + risingwave_rpc_client::error::Result, + >, +) -> ResponseStreamFuture { + stream.into_future().map(move |(opt, stream)| { + ( + worker_id, + stream, + opt.ok_or_else(|| anyhow!("end of stream").into()) + .and_then(|result| result.map_err(|e| e.into())), + ) + }) +} - /// Futures that await on the completion of barrier. - injected_in_progress_barrier: FuturesUnordered, +type ResponseStreamFuture = impl Future< + Output = ( + WorkerId, + BoxStream< + 'static, + risingwave_rpc_client::error::Result, + >, + MetaResult, + ), + > + 'static; - prev_injecting_barrier: Option>, +pub(super) struct ControlStreamManager { + context: GlobalBarrierManagerContext, + nodes: HashMap, + response_streams: FuturesUnordered, } -impl BarrierRpcManager { +impl ControlStreamManager { pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { Self { context, - injected_in_progress_barrier: FuturesUnordered::new(), - prev_injecting_barrier: None, + nodes: Default::default(), + response_streams: FuturesUnordered::new(), } } - pub(super) fn clear(&mut self) { - self.injected_in_progress_barrier = FuturesUnordered::new(); - self.prev_injecting_barrier = None; + pub(super) async fn add_worker(&mut self, node: WorkerNode) { + if self.nodes.contains_key(&node.id) { + warn!(id = node.id, host = ?node.host, "node already exists"); + return; + } + let prev_epoch = self + .context + .hummock_manager + .latest_snapshot() + .committed_epoch; + let node_id = node.id; + let node_host = node.host.clone().unwrap(); + let mut backoff = ExponentialBackoff::from_millis(100) + .max_delay(Duration::from_secs(3)) + .factor(5); + const MAX_RETRY: usize = 5; + for i in 1..=MAX_RETRY { + match self + .context + .new_control_stream_node(node.clone(), prev_epoch) + .await + { + Ok((stream_node, response_stream)) => { + let _ = self.nodes.insert(node_id, stream_node); + self.response_streams + .push(into_future(node_id, response_stream)); + info!(?node_host, "add control stream worker"); + return; + } + Err(e) => { + // It may happen that the dns information of newly registered worker node + // has not been propagated to the meta node and cause error. Wait for a while and retry + let delay = backoff.next().unwrap(); + error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address"); + sleep(delay).await; + } + } + } + error!(?node_host, "fail to create worker node after retry"); } - pub(super) fn inject_barrier(&mut self, command_context: Arc) { - // this is to notify that the barrier has been injected so that the next - // barrier can be injected to avoid out of order barrier injection. - // TODO: can be removed when bidi-stream control in implemented. - let (inject_tx, inject_rx) = oneshot::channel(); - let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); - let await_complete_future = - self.context - .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); - self.injected_in_progress_barrier - .push(await_complete_future); - } + pub(super) async fn reset( + &mut self, + prev_epoch: u64, + nodes: &HashMap, + ) -> MetaResult<()> { + let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async { + let node = self + .context + .new_control_stream_node(node.clone(), prev_epoch) + .await?; + Result::<_, MetaError>::Ok((*worker_id, node)) + })) + .await?; + self.nodes.clear(); + self.response_streams.clear(); + for (worker_id, (node, response_stream)) in nodes { + self.nodes.insert(worker_id, node); + self.response_streams + .push(into_future(worker_id, response_stream)); + } - pub(super) async fn next_collected_barrier(&mut self) -> BarrierCollectResult { - pending_on_none(self.injected_in_progress_barrier.next()).await + Ok(()) } -} - -pub(super) type BarrierCollectFuture = impl Future + Send + 'static; -impl GlobalBarrierManagerContext { - /// Inject a barrier to all CNs and spawn a task to collect it - pub(super) fn inject_barrier( - &self, - command_context: Arc, - inject_tx: Option>, - prev_inject_rx: Option>, - ) -> BarrierCollectFuture { - let (tx, rx) = oneshot::channel(); - let prev_epoch = command_context.prev_epoch.value().0; - let stream_rpc_manager = self.stream_rpc_manager.clone(); - // todo: the collect handler should be abort when recovery. - let _join_handle = tokio::spawn(async move { - let span = command_context.span.clone(); - if let Some(prev_inject_rx) = prev_inject_rx { - if prev_inject_rx.await.is_err() { - let _ = tx.send(BarrierCollectResult { - prev_epoch, - result: Err(anyhow!("prev barrier failed to be injected").into()), - }); - return; - } - } - let result = stream_rpc_manager - .inject_barrier(command_context.clone()) - .instrument(span.clone()) - .await; + pub(super) async fn next_response( + &mut self, + ) -> MetaResult<(WorkerId, u64, StreamingControlStreamResponse)> { + loop { + let (worker_id, response_stream, result) = + pending_on_none(self.response_streams.next()).await; match result { - Ok(node_need_collect) => { - if let Some(inject_tx) = inject_tx { - let _ = inject_tx.send(()); + Ok(resp) => match &resp.response { + Some(streaming_control_stream_response::Response::CompleteBarrier(_)) => { + self.response_streams + .push(into_future(worker_id, response_stream)); + let node = self + .nodes + .get_mut(&worker_id) + .expect("should exist when get collect resp"); + let command = node + .inflight_barriers + .pop_front() + .expect("should exist when get collect resp"); + break Ok((worker_id, command.prev_epoch.value().0, resp)); + } + resp => { + break Err(anyhow!("get unexpected resp: {:?}", resp).into()); + } + }, + Err(err) => { + let mut node = self + .nodes + .remove(&worker_id) + .expect("should exist when get collect resp"); + warn!(node = ?node.worker, err = ?err.as_report(), "get error from response stream"); + if let Some(command) = node.inflight_barriers.pop_front() { + self.context.report_collect_failure(&command, &err); + break Err(err); + } else { + // for node with no inflight barrier, simply ignore the error + continue; } - stream_rpc_manager - .collect_barrier(node_need_collect, command_context, tx) - .instrument(span.clone()) - .await; - } - Err(e) => { - let _ = tx.send(BarrierCollectResult { - prev_epoch, - result: Err(e), - }); } } - }); - rx.map(move |result| match result { - Ok(completion) => completion, - Err(_e) => BarrierCollectResult { - prev_epoch, - result: Err(anyhow!("failed to receive barrier completion result").into()), - }, - }) + } } } -impl StreamRpcManager { +impl ControlStreamManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. - async fn inject_barrier( - &self, + pub(super) fn inject_barrier( + &mut self, command_context: Arc, - ) -> MetaResult> { + ) -> MetaResult> { fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err")); - let mutation = command_context.to_mutation().await?; + let mutation = command_context.to_mutation(); let info = command_context.info.clone(); - let mut node_need_collect = HashMap::new(); - self.make_request( - info.node_map.iter().filter_map(|(node_id, node)| { + let mut node_need_collect = HashSet::new(); + + info.node_map + .iter() + .map(|(node_id, worker_node)| { let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); if actor_ids_to_collect.is_empty() { // No need to send or collect barrier for this node. assert!(actor_ids_to_send.is_empty()); - node_need_collect.insert(*node_id, false); - None + Ok(()) } else { - node_need_collect.insert(*node_id, true); + let Some(node) = self.nodes.get_mut(node_id) else { + return Err( + anyhow!("unconnected worker node: {:?}", worker_node.host).into() + ); + }; let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch { @@ -177,104 +247,89 @@ impl StreamRpcManager { kind: command_context.kind as i32, passed_actors: vec![], }; - Some(( - node, - InjectBarrierRequest { - request_id: Self::new_request_id(), - barrier: Some(barrier), - actor_ids_to_send, - actor_ids_to_collect, - }, - )) - } - }), - |client, request| { - async move { - tracing::debug!( - target: "events::meta::barrier::inject_barrier", - ?request, "inject barrier request" - ); - - // This RPC returns only if this worker node has injected this barrier. - client.inject_barrier(request).await - } - }, - ) - .await - .inspect_err(|e| { - // Record failure in event log. - use risingwave_pb::meta::event_log; - use thiserror_ext::AsReport; - let event = event_log::EventInjectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, - error: e.to_report_string(), - }; - self.env - .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); - })?; - Ok(node_need_collect) - } - /// Send barrier-complete-rpc and wait for responses from all CNs - async fn collect_barrier( - &self, - node_need_collect: HashMap, - command_context: Arc, - barrier_collect_tx: oneshot::Sender, - ) { - let prev_epoch = command_context.prev_epoch.value().0; - let tracing_context = - TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); + node.sender + .send(StreamingControlStreamRequest { + request: Some( + streaming_control_stream_request::Request::InjectBarrier( + InjectBarrierRequest { + request_id: StreamRpcManager::new_request_id(), + barrier: Some(barrier), + actor_ids_to_send, + actor_ids_to_collect, + }, + ), + ), + }) + .map_err(|_| { + MetaError::from(anyhow!( + "failed to send request to {} {:?}", + node.worker.id, + node.worker.host + )) + })?; - let info = command_context.info.clone(); - let result = self - .broadcast( - info.node_map.iter().filter_map(|(node_id, node)| { - if !*node_need_collect.get(node_id).unwrap() { - // No need to send or collect barrier for this node. - None - } else { - Some(node) - } - }), - |client| { - let tracing_context = tracing_context.clone(); - async move { - let request = BarrierCompleteRequest { - request_id: Self::new_request_id(), - prev_epoch, - tracing_context, - }; - tracing::debug!( - target: "events::meta::barrier::barrier_complete", - ?request, "barrier complete" - ); - - // This RPC returns only if this worker node has collected this barrier. - client.barrier_complete(request).await - } - }, - ) - .await + node.inflight_barriers.push_back(command_context.clone()); + node_need_collect.insert(*node_id); + Result::<_, MetaError>::Ok(()) + } + }) + .try_collect() .inspect_err(|e| { // Record failure in event log. use risingwave_pb::meta::event_log; - use thiserror_ext::AsReport; - let event = event_log::EventCollectBarrierFail { + let event = event_log::EventInjectBarrierFail { prev_epoch: command_context.prev_epoch.value().0, cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(), }; - self.env + self.context + .env .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); - }) - .map_err(Into::into); - let _ = barrier_collect_tx - .send(BarrierCollectResult { prev_epoch, result }) - .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); + .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); + })?; + Ok(node_need_collect) + } +} + +impl GlobalBarrierManagerContext { + async fn new_control_stream_node( + &self, + node: WorkerNode, + prev_epoch: u64, + ) -> MetaResult<( + ControlStreamNode, + BoxStream<'static, risingwave_rpc_client::error::Result>, + )> { + let handle = self + .env + .stream_client_pool() + .get(&node) + .await? + .start_streaming_control(prev_epoch) + .await?; + Ok(( + ControlStreamNode { + worker: node.clone(), + sender: handle.request_sender, + inflight_barriers: VecDeque::new(), + }, + handle.response_stream, + )) + } + + /// Send barrier-complete-rpc and wait for responses from all CNs + fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) { + // Record failure in event log. + use risingwave_pb::meta::event_log; + let event = event_log::EventCollectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: error.to_report_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); } } @@ -303,15 +358,6 @@ impl StreamRpcManager { result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err)) } - async fn broadcast> + 'static>( - &self, - nodes: impl Iterator, - f: impl Fn(StreamClient) -> Fut, - ) -> MetaResult> { - self.make_request(nodes.map(|node| (node, ())), |client, ()| f(client)) - .await - } - fn new_request_id() -> String { Uuid::new_v4().to_string() } @@ -403,23 +449,6 @@ impl StreamRpcManager { .await?; Ok(()) } - - pub async fn force_stop_actors( - &self, - nodes: impl Iterator, - prev_epoch: u64, - ) -> MetaResult<()> { - self.broadcast(nodes, |client| async move { - client - .force_stop_actors(ForceStopActorsRequest { - request_id: Self::new_request_id(), - prev_epoch, - }) - .await - }) - .await?; - Ok(()) - } } /// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`. @@ -466,8 +495,6 @@ fn merge_node_rpc_errors( ) -> MetaError { use std::fmt::Write; - use thiserror_ext::AsReport; - let concat: String = errors .into_iter() .fold(format!("{message}:"), |mut s, (w, e)| { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 41ed041879f0c..99ae32d26bb92 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use futures::future::BoxFuture; +use futures::future::{try_join_all, BoxFuture}; use itertools::Itertools; use num_integer::Integer; use num_traits::abs; @@ -2651,9 +2651,33 @@ impl GlobalStreamManager { tracing::debug!("reschedule plan: {:?}", reschedule_fragment); + let up_down_stream_fragment: HashSet<_> = reschedule_fragment + .iter() + .flat_map(|(_, reschedule)| { + reschedule + .upstream_fragment_dispatcher_ids + .iter() + .map(|(fragment_id, _)| *fragment_id) + .chain(reschedule.downstream_fragment_ids.iter().cloned()) + }) + .collect(); + + let fragment_actors = + try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async { + let actor_ids = self + .metadata_manager + .get_running_actors_of_fragment(*fragment_id) + .await?; + Result::<_, MetaError>::Ok((*fragment_id, actor_ids)) + })) + .await? + .into_iter() + .collect(); + let command = Command::RescheduleFragment { reschedules: reschedule_fragment, table_parallelism: table_parallelism.unwrap_or_default(), + fragment_actors, }; match &self.metadata_manager { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d6ef8944725e4..fa16b039236b6 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -756,6 +756,7 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + use futures::{Stream, TryStreamExt}; use risingwave_common::catalog::TableId; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::reader::SystemParamsRead; @@ -768,16 +769,20 @@ mod tests { use risingwave_pb::stream_service::stream_service_server::{ StreamService, StreamServiceServer, }; + use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::{ BroadcastActorInfoTableResponse, BuildActorsResponse, DropActorsRequest, - DropActorsResponse, InjectBarrierRequest, InjectBarrierResponse, UpdateActorsResponse, *, + DropActorsResponse, UpdateActorsResponse, *, }; + use tokio::spawn; + use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::Sender; #[cfg(feature = "failpoints")] use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio::time::sleep; - use tonic::{Request, Response, Status}; + use tokio_stream::wrappers::UnboundedReceiverStream; + use tonic::{Request, Response, Status, Streaming}; use super::*; use crate::barrier::{GlobalBarrierManager, StreamRpcManager}; @@ -805,6 +810,9 @@ mod tests { #[async_trait::async_trait] impl StreamService for FakeStreamService { + type StreamingControlStreamStream = + impl Stream>; + async fn update_actors( &self, request: Request, @@ -856,29 +864,46 @@ mod tests { Ok(Response::new(DropActorsResponse::default())) } - async fn force_stop_actors( + async fn streaming_control_stream( &self, - _request: Request, - ) -> std::result::Result, Status> { - self.inner.actor_streams.lock().unwrap().clear(); - self.inner.actor_ids.lock().unwrap().clear(); - self.inner.actor_infos.lock().unwrap().clear(); - - Ok(Response::new(ForceStopActorsResponse::default())) - } - - async fn inject_barrier( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(InjectBarrierResponse::default())) - } - - async fn barrier_complete( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(BarrierCompleteResponse::default())) + request: Request>, + ) -> Result, Status> { + let (tx, rx) = unbounded_channel(); + let mut request_stream = request.into_inner(); + let inner = self.inner.clone(); + let _join_handle = spawn(async move { + while let Ok(Some(request)) = request_stream.try_next().await { + match request.request.unwrap() { + streaming_control_stream_request::Request::Init(_) => { + inner.actor_streams.lock().unwrap().clear(); + inner.actor_ids.lock().unwrap().clear(); + inner.actor_infos.lock().unwrap().clear(); + let _ = tx.send(Ok(StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Init( + InitResponse {}, + )), + })); + } + streaming_control_stream_request::Request::InjectBarrier(_) => { + let _ = tx.send(Ok(StreamingControlStreamResponse { + response: Some( + streaming_control_stream_response::Response::CompleteBarrier( + BarrierCompleteResponse { + request_id: "".to_string(), + status: None, + create_mview_progress: vec![], + synced_sstables: vec![], + worker_id: 0, + table_watermarks: Default::default(), + }, + ), + ), + })); + } + } + } + }); + Ok(Response::new(UnboundedReceiverStream::new(rx))) } async fn wait_epoch_commit( diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 0485465499f5a..fabd1dabeca01 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -43,7 +43,9 @@ use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::heartbeat_request::extra_info; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{ + channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, +}; pub mod error; use error::Result; @@ -63,7 +65,9 @@ pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; use rw_futures_util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; -pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; +pub use stream_client::{ + StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle, +}; #[async_trait] pub trait RpcClient: Send + Sync + 'static + Clone { @@ -274,3 +278,63 @@ impl BidiStreamHandle { } } } + +/// The handle of a bidi-stream started from the rpc client. It is similar to the `BidiStreamHandle` +/// except that its sender is unbounded. +pub struct UnboundedBidiStreamHandle { + pub request_sender: UnboundedSender, + pub response_stream: BoxStream<'static, Result>, +} + +impl Debug for UnboundedBidiStreamHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(type_name::()) + } +} + +impl UnboundedBidiStreamHandle { + pub async fn initialize< + F: FnOnce(UnboundedReceiver) -> Fut, + St: Stream> + Send + Unpin + 'static, + Fut: Future> + Send, + R: Into, + >( + first_request: R, + init_stream_fn: F, + ) -> Result<(Self, RSP)> { + let (request_sender, request_receiver) = unbounded_channel(); + + // Send initial request in case of the blocking receive call from creating streaming request + request_sender + .send(first_request.into()) + .map_err(|_err| anyhow!("unable to send first request of {}", type_name::()))?; + + let mut response_stream = init_stream_fn(request_receiver).await?; + + let first_response = response_stream + .next() + .await + .context("get empty response from first request")??; + + Ok(( + Self { + request_sender, + response_stream: response_stream.boxed(), + }, + first_response, + )) + } + + pub async fn next_response(&mut self) -> Result { + self.response_stream + .next() + .await + .ok_or_else(|| anyhow!("end of response stream"))? + } + + pub fn send_request(&mut self, request: REQ) -> Result<()> { + self.request_sender + .send(request) + .map_err(|_| anyhow!("unable to send request {}", type_name::()).into()) + } +} diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 3a271b5660bbd..ae5af65f28220 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -15,17 +15,22 @@ use std::sync::Arc; use std::time::Duration; +use anyhow::anyhow; use async_trait::async_trait; +use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; +use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; +use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::*; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Endpoint; -use crate::error::Result; +use crate::error::{Result, RpcError}; use crate::tracing::{Channel, TracingInjectedChannelExt}; -use crate::{rpc_client_method_impl, RpcClient, RpcClientPool}; +use crate::{rpc_client_method_impl, RpcClient, RpcClientPool, UnboundedBidiStreamHandle}; #[derive(Clone)] pub struct StreamClient(StreamServiceClient); @@ -68,9 +73,6 @@ macro_rules! for_all_stream_rpc { ,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse } ,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse } ,{ 0, drop_actors, DropActorsRequest, DropActorsResponse } - ,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse} - ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } - ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; @@ -79,3 +81,35 @@ macro_rules! for_all_stream_rpc { impl StreamClient { for_all_stream_rpc! { rpc_client_method_impl } } + +pub type StreamingControlHandle = + UnboundedBidiStreamHandle; + +impl StreamClient { + pub async fn start_streaming_control(&self, prev_epoch: u64) -> Result { + let first_request = StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::Init( + InitRequest { prev_epoch }, + )), + }; + let mut client = self.0.to_owned(); + let (handle, first_rsp) = + UnboundedBidiStreamHandle::initialize(first_request, |rx| async move { + client + .streaming_control_stream(UnboundedReceiverStream::new(rx)) + .await + .map(|response| response.into_inner().map_err(RpcError::from)) + .map_err(RpcError::from) + }) + .await?; + match first_rsp { + StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Init(InitResponse {})), + } => {} + other => { + return Err(anyhow!("expect InitResponse but get {:?}", other).into()); + } + }; + Ok(handle) + } +} diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 0a1def6c1f7f5..096020ff569f6 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,8 +635,10 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self, _prev_epoch: u64) { - unimplemented!("recovery not supported") + async fn clear_shared_buffer(&self, prev_epoch: u64) { + for (key, _) in self.inner.range((Unbounded, Unbounded), None).unwrap() { + assert!(key.epoch_with_gap.pure_epoch() <= prev_epoch); + } } #[allow(clippy::unused_async)] diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index edbd660690049..6fef59b6740d1 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,20 +13,25 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::future::pending; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use futures::stream::FuturesUnordered; +use futures::stream::{BoxStream, FuturesUnordered}; use futures::StreamExt; +use itertools::Itertools; use parking_lot::Mutex; -use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; +use risingwave_pb::stream_service::barrier_complete_response::{ + GroupedSstableInfo, PbCreateMviewProgress, +}; use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tonic::Status; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; @@ -41,9 +46,17 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; +use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; +use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; +use risingwave_pb::stream_service::{ + streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamRequest, + StreamingControlStreamResponse, +}; use risingwave_storage::store::SyncResult; use crate::executor::exchange::permit::Receiver; @@ -65,6 +78,71 @@ pub struct BarrierCompleteResult { pub create_mview_progress: Vec, } +pub(super) struct ControlStreamHandle { + #[expect(clippy::type_complexity)] + pair: Option<( + UnboundedSender>, + BoxStream<'static, Result>, + )>, +} + +impl ControlStreamHandle { + fn empty() -> Self { + Self { pair: None } + } + + pub(super) fn new( + sender: UnboundedSender>, + request_stream: BoxStream<'static, Result>, + ) -> Self { + Self { + pair: Some((sender, request_stream)), + } + } + + fn reset_stream_with_err(&mut self, err: Status) { + if let Some((sender, _)) = self.pair.take() { + warn!("control stream reset with: {:?}", err.as_report()); + if sender.send(Err(err)).is_err() { + warn!("failed to notify finish of control stream"); + } + } + } + + fn inspect_result(&mut self, result: StreamResult<()>) { + if let Err(e) = result { + self.reset_stream_with_err(Status::internal(format!("get error: {:?}", e.as_report()))); + } + } + + fn send_response(&mut self, response: StreamingControlStreamResponse) { + if let Some((sender, _)) = self.pair.as_ref() { + if sender.send(Ok(response)).is_err() { + self.pair = None; + warn!("fail to send response. control stream reset"); + } + } else { + debug!(?response, "control stream has been reset. ignore response"); + } + } + + async fn next_request(&mut self) -> StreamingControlStreamRequest { + if let Some((_, stream)) = &mut self.pair { + match stream.next().await { + Some(Ok(request)) => { + return request; + } + Some(Err(e)) => self.reset_stream_with_err(Status::internal(format!( + "failed to get request: {:?}", + e.as_report() + ))), + None => self.reset_stream_with_err(Status::internal("end of stream")), + } + } + pending().await + } +} + pub(super) enum LocalBarrierEvent { RegisterSender { actor_id: ActorId, @@ -84,19 +162,9 @@ pub(super) enum LocalBarrierEvent { } pub(super) enum LocalActorOperation { - InjectBarrier { - barrier: Barrier, - actor_ids_to_send: HashSet, - actor_ids_to_collect: HashSet, - result_sender: oneshot::Sender>, - }, - Reset { - prev_epoch: u64, - result_sender: oneshot::Sender<()>, - }, - AwaitEpochCompleted { - epoch: u64, - result_sender: oneshot::Sender>, + NewControlStream { + handle: ControlStreamHandle, + init_request: InitRequest, }, DropActors { actors: Vec, @@ -194,7 +262,7 @@ pub(super) struct LocalBarrierWorker { /// Record all unexpected exited actors. failure_actors: HashMap, - epoch_result_sender: HashMap>>, + control_stream_handle: ControlStreamHandle, pub(super) actor_manager: Arc, @@ -228,7 +296,7 @@ impl LocalBarrierWorker { actor_manager.env.state_store(), actor_manager.streaming_metrics.clone(), ), - epoch_result_sender: HashMap::default(), + control_stream_handle: ControlStreamHandle::empty(), actor_manager, actor_manager_state: StreamActorManagerState::new(), current_shared_context: shared_context, @@ -246,7 +314,8 @@ impl LocalBarrierWorker { self.handle_actor_created(sender, create_actors_result); } completed_epoch = self.state.next_completed_epoch() => { - self.on_epoch_completed(completed_epoch); + let result = self.on_epoch_completed(completed_epoch); + self.control_stream_handle.inspect_result(result); }, // Note: it's important to select in a biased way to ensure that // barrier event is handled before actor_op, because we must ensure @@ -261,10 +330,13 @@ impl LocalBarrierWorker { actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { match actor_op { - LocalActorOperation::Reset { - result_sender, prev_epoch} => { - self.reset(prev_epoch).await; - let _ = result_sender.send(()); + LocalActorOperation::NewControlStream { handle, init_request } => { + self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); + self.reset(init_request.prev_epoch).await; + self.control_stream_handle = handle; + self.control_stream_handle.send_response(StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) + }); } actor_op => { self.handle_actor_op(actor_op); @@ -274,7 +346,11 @@ impl LocalBarrierWorker { else { break; } - } + }, + request = self.control_stream_handle.next_request() => { + let result = self.handle_streaming_control_request(request); + self.control_stream_handle.inspect_result(result); + }, } } } @@ -291,6 +367,26 @@ impl LocalBarrierWorker { let _ = sender.send(result); } + fn handle_streaming_control_request( + &mut self, + request: StreamingControlStreamRequest, + ) -> StreamResult<()> { + match request.request.expect("should not be empty") { + Request::InjectBarrier(req) => { + let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; + self.send_barrier( + &barrier, + req.actor_ids_to_send.into_iter().collect(), + req.actor_ids_to_collect.into_iter().collect(), + )?; + Ok(()) + } + Request::Init(_) => { + unreachable!() + } + } + } + fn handle_barrier_event(&mut self, event: LocalBarrierEvent) { match event { LocalBarrierEvent::RegisterSender { actor_id, sender } => { @@ -313,26 +409,8 @@ impl LocalBarrierWorker { fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { - LocalActorOperation::InjectBarrier { - barrier, - actor_ids_to_send, - actor_ids_to_collect, - result_sender, - } => { - let result = self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); - let _ = result_sender.send(result).inspect_err(|e| { - warn!(err=?e, "fail to send inject barrier result"); - }); - } - LocalActorOperation::Reset { .. } => { - unreachable!("Reset event should be handled separately in async context") - } - - LocalActorOperation::AwaitEpochCompleted { - epoch, - result_sender, - } => { - self.await_epoch_completed(epoch, result_sender); + LocalActorOperation::NewControlStream { .. } => { + unreachable!("NewControlStream event should be handled separately in async context") } LocalActorOperation::DropActors { actors, @@ -371,17 +449,55 @@ impl LocalBarrierWorker { // event handler impl LocalBarrierWorker { - fn on_epoch_completed(&mut self, epoch: u64) { - if let Some(sender) = self.epoch_result_sender.remove(&epoch) { - let result = self - .state - .pop_completed_epoch(epoch) - .expect("should exist") - .expect("should have completed"); - if sender.send(result).is_err() { - warn!(epoch, "fail to send epoch complete result"); - } - } + fn on_epoch_completed(&mut self, epoch: u64) -> StreamResult<()> { + let result = self + .state + .pop_completed_epoch(epoch) + .expect("should exist") + .expect("should have completed")?; + + let BarrierCompleteResult { + create_mview_progress, + sync_result, + } = result; + + let (synced_sstables, table_watermarks) = sync_result + .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) + .unwrap_or_default(); + + let result = StreamingControlStreamResponse { + response: Some( + streaming_control_stream_response::Response::CompleteBarrier( + BarrierCompleteResponse { + request_id: "todo".to_string(), + status: None, + create_mview_progress, + synced_sstables: synced_sstables + .into_iter() + .map( + |LocalSstableInfo { + compaction_group_id, + sst_info, + table_stats, + }| GroupedSstableInfo { + compaction_group_id, + sst: Some(sst_info), + table_stats_map: to_prost_table_stats_map(table_stats), + }, + ) + .collect_vec(), + worker_id: self.actor_manager.env.worker_id(), + table_watermarks: table_watermarks + .into_iter() + .map(|(key, value)| (key.table_id, value.to_protobuf())) + .collect(), + }, + ), + ), + }; + + self.control_stream_handle.send_response(result); + Ok(()) } /// Register sender for source actors, used to send barriers. @@ -407,7 +523,6 @@ impl LocalBarrierWorker { ) -> StreamResult<()> { #[cfg(not(test))] { - use itertools::Itertools; // The barrier might be outdated and been injected after recovery in some certain extreme // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during // recovery. Check it here and return an error here if some actors are not found to @@ -492,36 +607,6 @@ impl LocalBarrierWorker { Ok(()) } - /// Use `prev_epoch` to remove collect rx and return rx. - fn await_epoch_completed( - &mut self, - prev_epoch: u64, - result_sender: oneshot::Sender>, - ) { - match self.state.pop_completed_epoch(prev_epoch) { - Err(e) => { - let _ = result_sender.send(Err(e)); - } - Ok(Some(result)) => { - if result_sender.send(result).is_err() { - warn!(prev_epoch, "failed to send completed epoch result"); - } - } - Ok(None) => { - if let Some(prev_sender) = - self.epoch_result_sender.insert(prev_epoch, result_sender) - { - warn!(?prev_epoch, "duplicate await_collect_barrier on epoch"); - let _ = prev_sender.send(Err(anyhow!( - "duplicate await_collect_barrier on epoch {}", - prev_epoch - ) - .into())); - } - } - } - } - /// Reset all internal states. pub(super) fn reset_state(&mut self) { *self = Self::new(self.actor_manager.clone()); @@ -538,12 +623,14 @@ impl LocalBarrierWorker { async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { self.add_failure(actor_id, err.clone()); let root_err = self.try_find_root_failure(err).await; - for fail_epoch in self.state.epochs_await_on_actor(actor_id) { - if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { - if result_sender.send(Err(root_err.clone())).is_err() { - warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure"); - } - } + let failed_epochs = self.state.epochs_await_on_actor(actor_id).collect_vec(); + if !failed_epochs.is_empty() { + self.control_stream_handle + .reset_stream_with_err(Status::internal(format!( + "failed to collect barrier. epoch: {:?}, err: {:?}", + failed_epochs, + root_err.as_report() + ))); } } @@ -648,40 +735,7 @@ impl LocalBarrierManager { pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { self.send_event(LocalBarrierEvent::RegisterSender { actor_id, sender }); } -} -impl EventSender { - /// Broadcast a barrier to all senders. Save a receiver which will get notified when this - /// barrier is finished, in managed mode. - pub(super) async fn send_barrier( - &self, - barrier: Barrier, - actor_ids_to_send: impl IntoIterator, - actor_ids_to_collect: impl IntoIterator, - ) -> StreamResult<()> { - self.send_and_await(move |result_sender| LocalActorOperation::InjectBarrier { - barrier, - actor_ids_to_send: actor_ids_to_send.into_iter().collect(), - actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), - result_sender, - }) - .await? - } - - /// Use `prev_epoch` to remove collect rx and return rx. - pub(super) async fn await_epoch_completed( - &self, - prev_epoch: u64, - ) -> StreamResult { - self.send_and_await(|result_sender| LocalActorOperation::AwaitEpochCompleted { - epoch: prev_epoch, - result_sender, - }) - .await? - } -} - -impl LocalBarrierManager { /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { @@ -727,7 +781,7 @@ pub fn try_find_root_actor_failure<'a>( #[cfg(test)] impl LocalBarrierManager { - pub(super) async fn spawn_for_test() -> (EventSender, Self) { + pub(super) fn spawn_for_test() -> EventSender { use std::sync::atomic::AtomicU64; let (tx, rx) = unbounded_channel(); let _join_handle = LocalBarrierWorker::spawn( @@ -737,13 +791,7 @@ impl LocalBarrierManager { Arc::new(AtomicU64::new(0)), rx, ); - let sender = EventSender(tx); - let context = sender - .send_and_await(LocalActorOperation::GetCurrentSharedContext) - .await - .unwrap(); - - (sender, context.local_barrier_manager.clone()) + EventSender(tx) } pub fn for_test() -> Self { diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index ae248facc0e5f..2ec421661a14a 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -17,15 +17,43 @@ use std::iter::once; use std::pin::pin; use std::task::Poll; +use assert_matches::assert_matches; +use futures::future::join_all; +use futures::FutureExt; use itertools::Itertools; use risingwave_common::util::epoch::test_epoch; +use risingwave_pb::stream_service::{streaming_control_stream_request, InjectBarrierRequest}; use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::wrappers::UnboundedReceiverStream; use super::*; #[tokio::test] async fn test_managed_barrier_collection() -> StreamResult<()> { - let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await; + let actor_op_tx = LocalBarrierManager::spawn_for_test(); + + let (request_tx, request_rx) = unbounded_channel(); + let (response_tx, mut response_rx) = unbounded_channel(); + + actor_op_tx.send_event(LocalActorOperation::NewControlStream { + handle: ControlStreamHandle::new( + response_tx, + UnboundedReceiverStream::new(request_rx).boxed(), + ), + init_request: InitRequest { prev_epoch: 0 }, + }); + + assert_matches!( + response_rx.recv().await.unwrap().unwrap().response.unwrap(), + streaming_control_stream_response::Response::Init(_) + ); + + let context = actor_op_tx + .send_and_await(LocalActorOperation::GetCurrentSharedContext) + .await + .unwrap(); + + let manager = &context.local_barrier_manager; let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -47,21 +75,35 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - actor_op_tx - .send_barrier(barrier.clone(), actor_ids.clone(), actor_ids) - .await + request_tx + .send(Ok(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::InjectBarrier( + InjectBarrierRequest { + request_id: "".to_string(), + barrier: Some(barrier.to_protobuf()), + actor_ids_to_send: actor_ids.clone(), + actor_ids_to_collect: actor_ids, + }, + )), + })) .unwrap(); - // Collect barriers from actors - let collected_barriers = rxs - .iter_mut() - .map(|(actor_id, rx)| { - let barrier = rx.try_recv().unwrap(); - assert_eq!(barrier.epoch.prev, epoch); - (*actor_id, barrier) - }) - .collect_vec(); - let mut await_epoch_future = pin!(actor_op_tx.await_epoch_completed(epoch)); + // Collect barriers from actors + let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { + let barrier = rx.recv().await.unwrap(); + assert_eq!(barrier.epoch.prev, epoch); + (*actor_id, barrier) + })) + .await; + + let mut await_epoch_future = pin!(response_rx.recv().map(|result| { + let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); + let resp = resp.response.unwrap(); + match resp { + streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} + _ => unreachable!(), + } + })); // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { @@ -77,7 +119,30 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { #[tokio::test] async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> { - let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await; + let actor_op_tx = LocalBarrierManager::spawn_for_test(); + + let (request_tx, request_rx) = unbounded_channel(); + let (response_tx, mut response_rx) = unbounded_channel(); + + actor_op_tx.send_event(LocalActorOperation::NewControlStream { + handle: ControlStreamHandle::new( + response_tx, + UnboundedReceiverStream::new(request_rx).boxed(), + ), + init_request: InitRequest { prev_epoch: 0 }, + }); + + assert_matches!( + response_rx.recv().await.unwrap().unwrap().response.unwrap(), + streaming_control_stream_response::Response::Init(_) + ); + + let context = actor_op_tx + .send_and_await(LocalActorOperation::GetCurrentSharedContext) + .await + .unwrap(); + + let manager = &context.local_barrier_manager; let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -109,23 +174,35 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Collect a barrier before sending manager.collect(extra_actor_id, &barrier); - // Send the barrier to all actors - actor_op_tx - .send_barrier(barrier.clone(), actor_ids_to_send, actor_ids_to_collect) - .await + request_tx + .send(Ok(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::InjectBarrier( + InjectBarrierRequest { + request_id: "".to_string(), + barrier: Some(barrier.to_protobuf()), + actor_ids_to_send, + actor_ids_to_collect, + }, + )), + })) .unwrap(); // Collect barriers from actors - let collected_barriers = rxs - .iter_mut() - .map(|(actor_id, rx)| { - let barrier = rx.try_recv().unwrap(); - assert_eq!(barrier.epoch.prev, epoch); - (*actor_id, barrier) - }) - .collect_vec(); - - let mut await_epoch_future = pin!(actor_op_tx.await_epoch_completed(epoch)); + let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { + let barrier = rx.recv().await.unwrap(); + assert_eq!(barrier.epoch.prev, epoch); + (*actor_id, barrier) + })) + .await; + + let mut await_epoch_future = pin!(response_rx.recv().map(|result| { + let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); + let resp = resp.response.unwrap(); + match resp { + streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} + _ => unreachable!(), + } + })); // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 602a6f49e2385..9045adc1263ce 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; +use futures::stream::BoxStream; use futures::FutureExt; use itertools::Itertools; use parking_lot::Mutex; @@ -33,25 +34,32 @@ use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode}; +use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; +use risingwave_pb::stream_service::{ + StreamingControlStreamRequest, StreamingControlStreamResponse, +}; use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; use rw_futures_util::AttachedFuture; use thiserror_ext::AsReport; -use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tonic::Status; -use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; +use super::{unique_executor_id, unique_operator_id}; use crate::error::StreamResult; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::{ - Actor, ActorContext, ActorContextRef, Barrier, DispatchExecutor, DispatcherImpl, Executor, - ExecutorInfo, WrapperExecutor, + Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Executor, ExecutorInfo, + WrapperExecutor, }; use crate::from_proto::create_executor; -use crate::task::barrier_manager::{EventSender, LocalActorOperation, LocalBarrierWorker}; +use crate::task::barrier_manager::{ + ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, +}; use crate::task::{ ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, StreamActorManagerState, StreamEnvironment, UpDownActorIds, @@ -199,22 +207,19 @@ impl LocalStreamManager { } } - /// Broadcast a barrier to all senders. Save a receiver in barrier manager - pub async fn send_barrier( + /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream + /// to receive control message from meta + pub fn handle_new_control_stream( &self, - barrier: Barrier, - actor_ids_to_send: impl IntoIterator, - actor_ids_to_collect: impl IntoIterator, - ) -> StreamResult<()> { + sender: UnboundedSender>, + request_stream: BoxStream<'static, Result>, + init_request: InitRequest, + ) { self.actor_op_tx - .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) - .await - } - - /// Use `epoch` to find collect rx. And wait for all actor to be collected before - /// returning. - pub async fn collect_barrier(&self, prev_epoch: u64) -> StreamResult { - self.actor_op_tx.await_epoch_completed(prev_epoch).await + .send_event(LocalActorOperation::NewControlStream { + handle: ControlStreamHandle::new(sender, request_stream), + init_request, + }) } /// Drop the resources of the given actors. @@ -227,17 +232,6 @@ impl LocalStreamManager { .await } - /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self, prev_epoch: u64) { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::Reset { - result_sender, - prev_epoch, - }) - .await - .expect("should receive reset") - } - pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { self.actor_op_tx .send_and_await(|result_sender| LocalActorOperation::UpdateActors { diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index d44d69219bbaf..847c7c60c7cd2 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -26,6 +26,9 @@ use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; use crate::utils::TimedExt; +// retry a maximum times until it succeed +const MAX_RETRY: usize = 5; + fn is_create_table_as(sql: &str) -> bool { let parts: Vec = sql.split_whitespace().map(|s| s.to_lowercase()).collect(); @@ -271,10 +274,13 @@ pub async fn run_slt_task( }) .await { + let err_string = err.to_string(); // cluster could be still under recovering if killed before, retry if // meets `no reader for dml in table with id {}`. - let should_retry = - err.to_string().contains("no reader for dml in table") && i < 5; + let should_retry = (err_string.contains("no reader for dml in table") + || err_string + .contains("error reading a body from connection: broken pipe")) + || err_string.contains("failed to inject barrier") && i < MAX_RETRY; if !should_retry { panic!("{}", err); } @@ -302,8 +308,6 @@ pub async fn run_slt_task( None }; - // retry up to 5 times until it succeed - let max_retry = 5; for i in 0usize.. { tracing::debug!(iteration = i, "retry count"); let delay = Duration::from_secs(1 << i); @@ -348,7 +352,7 @@ pub async fn run_slt_task( ?err, "failed to wait for background mv to finish creating" ); - if i >= max_retry { + if i >= MAX_RETRY { panic!("failed to run test after retry {i} times, error={err:#?}"); } continue; @@ -379,8 +383,8 @@ pub async fn run_slt_task( break } - // Keep i >= max_retry for other errors. Since these errors indicate that the MV might not yet be created. - _ if i >= max_retry => { + // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created. + _ if i >= MAX_RETRY => { panic!("failed to run test after retry {i} times: {e}") } SqlCmd::CreateMaterializedView { ref name } @@ -404,7 +408,7 @@ pub async fn run_slt_task( ?err, "failed to wait for background mv to finish creating" ); - if i >= max_retry { + if i >= MAX_RETRY { panic!("failed to run test after retry {i} times, error={err:#?}"); } continue;