diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 5990fe1e2cbcf..e8c5d94a20ac3 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -44,6 +44,16 @@ message DropActorsResponse { common.Status status = 2; } +message ForceStopActorsRequest { + string request_id = 1; + uint64 prev_epoch = 2; +} + +message ForceStopActorsResponse { + string request_id = 1; + common.Status status = 2; +} + message InjectBarrierRequest { string request_id = 1; stream_plan.Barrier barrier = 2; @@ -51,6 +61,16 @@ message InjectBarrierRequest { repeated uint32 actor_ids_to_collect = 4; } +message InjectBarrierResponse { + string request_id = 1; + common.Status status = 2; +} + +message BarrierCompleteRequest { + string request_id = 1; + uint64 prev_epoch = 2; + map tracing_context = 3; +} message BarrierCompleteResponse { message CreateMviewProgress { uint32 backfill_actor_id = 1; @@ -84,33 +104,15 @@ message WaitEpochCommitResponse { common.Status status = 1; } -message StreamingControlStreamRequest { - message InitRequest { - uint64 prev_epoch = 2; - } - - oneof request { - InitRequest init = 1; - InjectBarrierRequest inject_barrier = 2; - } -} - -message StreamingControlStreamResponse { - message InitResponse {} - - oneof response { - InitResponse init = 1; - BarrierCompleteResponse complete_barrier = 2; - } -} - service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse); rpc DropActors(DropActorsRequest) returns (DropActorsResponse); + rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse); + rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); + rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); - rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 18b77ff1804bc..6e96406743f29 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -13,16 +13,18 @@ // limitations under the License. use await_tree::InstrumentAwait; -use futures::{Stream, StreamExt, TryStreamExt}; +use itertools::Itertools; +use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; +use risingwave_hummock_sdk::LocalSstableInfo; +use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; -use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; +use risingwave_stream::executor::Barrier; +use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; -use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::{Request, Response, Status, Streaming}; +use tonic::{Request, Response, Status}; #[derive(Clone)] pub struct StreamServiceImpl { @@ -38,9 +40,6 @@ impl StreamServiceImpl { #[async_trait::async_trait] impl StreamService for StreamServiceImpl { - type StreamingControlStreamStream = - impl Stream>; - #[cfg_attr(coverage, coverage(off))] async fn update_actors( &self, @@ -111,6 +110,86 @@ impl StreamService for StreamServiceImpl { })) } + #[cfg_attr(coverage, coverage(off))] + async fn force_stop_actors( + &self, + request: Request, + ) -> std::result::Result, Status> { + let req = request.into_inner(); + self.mgr.reset(req.prev_epoch).await; + Ok(Response::new(ForceStopActorsResponse { + request_id: req.request_id, + status: None, + })) + } + + #[cfg_attr(coverage, coverage(off))] + async fn inject_barrier( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let barrier = + Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; + + self.mgr + .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) + .await?; + + Ok(Response::new(InjectBarrierResponse { + request_id: req.request_id, + status: None, + })) + } + + #[cfg_attr(coverage, coverage(off))] + async fn barrier_complete( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let BarrierCompleteResult { + create_mview_progress, + sync_result, + } = self + .mgr + .collect_barrier(req.prev_epoch) + .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) + .await + .inspect_err( + |err| tracing::error!(error = %err.as_report(), "failed to collect barrier"), + )?; + + let (synced_sstables, table_watermarks) = sync_result + .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) + .unwrap_or_default(); + + Ok(Response::new(BarrierCompleteResponse { + request_id: req.request_id, + status: None, + create_mview_progress, + synced_sstables: synced_sstables + .into_iter() + .map( + |LocalSstableInfo { + compaction_group_id, + sst_info, + table_stats, + }| GroupedSstableInfo { + compaction_group_id, + sst: Some(sst_info), + table_stats_map: to_prost_table_stats_map(table_stats), + }, + ) + .collect_vec(), + worker_id: self.env.worker_id(), + table_watermarks: table_watermarks + .into_iter() + .map(|(key, value)| (key.table_id, value.to_protobuf())) + .collect(), + })) + } + #[cfg_attr(coverage, coverage(off))] async fn wait_epoch_commit( &self, @@ -131,24 +210,4 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } - - async fn streaming_control_stream( - &self, - request: Request>, - ) -> Result, Status> { - let mut stream = request.into_inner().boxed(); - let first_request = stream.try_next().await?; - let Some(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::Init(init_request)), - }) = first_request - else { - return Err(Status::invalid_argument(format!( - "unexpected first request: {:?}", - first_request - ))); - }; - let (tx, rx) = unbounded_channel(); - self.mgr.handle_new_control_stream(tx, stream, init_request); - Ok(Response::new(UnboundedReceiverStream::new(rx))) - } } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 22311a2b43911..006de45d522e7 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -175,8 +175,6 @@ pub enum Command { RescheduleFragment { reschedules: HashMap, table_parallelism: HashMap, - // should contain the actor ids in upstream and downstream fragment of `reschedules` - fragment_actors: HashMap>, }, /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is @@ -353,7 +351,7 @@ impl CommandContext { impl CommandContext { /// Generate a mutation for the given command. - pub fn to_mutation(&self) -> Option { + pub async fn to_mutation(&self) -> MetaResult> { let mutation = match &self.command { Command::Plain(mutation) => mutation.clone(), @@ -481,23 +479,21 @@ impl CommandContext { init_split_assignment, ), - Command::RescheduleFragment { - reschedules, - fragment_actors, - .. - } => { + Command::RescheduleFragment { reschedules, .. } => { + let metadata_manager = &self.barrier_manager_context.metadata_manager; + let mut dispatcher_update = HashMap::new(); for reschedule in reschedules.values() { for &(upstream_fragment_id, dispatcher_id) in &reschedule.upstream_fragment_dispatcher_ids { // Find the actors of the upstream fragment. - let upstream_actor_ids = fragment_actors - .get(&upstream_fragment_id) - .expect("should contain"); + let upstream_actor_ids = metadata_manager + .get_running_actors_of_fragment(upstream_fragment_id) + .await?; // Record updates for all actors. - for &actor_id in upstream_actor_ids { + for actor_id in upstream_actor_ids { // Index with the dispatcher id to check duplicates. dispatcher_update .try_insert( @@ -530,9 +526,9 @@ impl CommandContext { for (&fragment_id, reschedule) in reschedules { for &downstream_fragment_id in &reschedule.downstream_fragment_ids { // Find the actors of the downstream fragment. - let downstream_actor_ids = fragment_actors - .get(&downstream_fragment_id) - .expect("should contain"); + let downstream_actor_ids = metadata_manager + .get_running_actors_of_fragment(downstream_fragment_id) + .await?; // Downstream removed actors should be skipped // Newly created actors of the current fragment will not dispatch Update @@ -549,7 +545,7 @@ impl CommandContext { .unwrap_or_default(); // Record updates for all actors. - for &actor_id in downstream_actor_ids { + for actor_id in downstream_actor_ids { if downstream_removed_actors.contains(&actor_id) { continue; } @@ -624,7 +620,7 @@ impl CommandContext { } }; - mutation + Ok(mutation) } fn generate_update_mutation_for_replace_table( @@ -966,7 +962,6 @@ impl CommandContext { Command::RescheduleFragment { reschedules, table_parallelism, - .. } => { let removed_actors = reschedules .values() diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 652a4b51d9264..4d867d266270b 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -25,11 +25,11 @@ use arc_swap::ArcSwap; use fail::fail_point; use itertools::Itertools; use prometheus::HistogramTimer; -use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; @@ -41,9 +41,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; -use risingwave_pb::stream_service::{ - streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse, -}; +use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; @@ -56,13 +54,12 @@ use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::rpc::ControlStreamManager; +use crate::barrier::rpc::BarrierRpcManager; use crate::barrier::state::BarrierManagerState; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ - ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, - MetadataManager, WorkerId, + ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId, }; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; @@ -191,9 +188,9 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, - active_streaming_nodes: ActiveStreamingWorkerNodes, + rpc_manager: BarrierRpcManager, - control_stream_manager: ControlStreamManager, + active_streaming_nodes: ActiveStreamingWorkerNodes, } /// Controls the concurrent execution of commands. @@ -231,7 +228,7 @@ impl CheckpointControl { self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue .values() - .filter(|x| x.state.is_inflight()) + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() as i64, ); self.context @@ -241,12 +238,7 @@ impl CheckpointControl { } /// Enqueue a barrier command, and init its state to `InFlight`. - fn enqueue_command( - &mut self, - command_ctx: Arc, - notifiers: Vec, - node_to_collect: HashSet, - ) { + fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { let timer = self.context.metrics.barrier_latency.start_timer(); if let Some((_, node)) = self.command_ctx_queue.last_key_value() { @@ -259,10 +251,7 @@ impl CheckpointControl { command_ctx.prev_epoch.value().0, EpochNode { enqueue_time: timer, - state: BarrierEpochState { - node_to_collect, - resps: vec![], - }, + state: BarrierEpochState::InFlight, command_ctx, notifiers, }, @@ -271,19 +260,14 @@ impl CheckpointControl { /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_collected( - &mut self, - worker_id: WorkerId, - prev_epoch: u64, - resp: BarrierCompleteResponse, - ) { + fn barrier_collected(&mut self, prev_epoch: u64, result: Vec) { if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { - assert!(node.state.node_to_collect.remove(&worker_id)); - node.state.resps.push(resp); + assert!(matches!(node.state, BarrierEpochState::InFlight)); + node.state = BarrierEpochState::Collected(result); } else { panic!( - "collect barrier on non-existing barrier: {}, {}", - prev_epoch, worker_id + "received barrier complete response for an unknown epoch: {}", + prev_epoch ); } } @@ -293,7 +277,7 @@ impl CheckpointControl { let in_flight_not_full = self .command_ctx_queue .values() - .filter(|x| x.state.is_inflight()) + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() < in_flight_barrier_nums; @@ -356,8 +340,13 @@ impl CheckpointControl { }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + while let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() { let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); @@ -401,16 +390,12 @@ pub struct EpochNode { } /// The state of barrier. -struct BarrierEpochState { - node_to_collect: HashSet, +enum BarrierEpochState { + /// This barrier is current in-flight on the stream graph of compute nodes. + InFlight, - resps: Vec, -} - -impl BarrierEpochState { - fn is_inflight(&self) -> bool { - !self.node_to_collect.is_empty() - } + /// This barrier is collected. + Collected(Vec), } enum CompletingCommand { @@ -426,6 +411,13 @@ enum CompletingCommand { Err(MetaError), } +/// The result of barrier collect. +#[derive(Debug)] +struct BarrierCollectResult { + prev_epoch: u64, + result: MetaResult>, +} + impl GlobalBarrierManager { /// Create a new [`crate::barrier::GlobalBarrierManager`]. #[allow(clippy::too_many_arguments)] @@ -466,9 +458,10 @@ impl GlobalBarrierManager { env: env.clone(), }; - let control_stream_manager = ControlStreamManager::new(context.clone()); let checkpoint_control = CheckpointControl::new(context.clone()); + let rpc_manager = BarrierRpcManager::new(context.clone()); + Self { enable_recovery, scheduled_barriers, @@ -477,8 +470,8 @@ impl GlobalBarrierManager { env, state: initial_invalid_state, checkpoint_control, + rpc_manager, active_streaming_nodes, - control_stream_manager, } } @@ -496,7 +489,7 @@ impl GlobalBarrierManager { } /// Check whether we should pause on bootstrap from the system parameter and reset it. - async fn take_pause_on_bootstrap(&mut self) -> MetaResult { + async fn take_pause_on_bootstrap(&self) -> MetaResult { let paused = self .env .system_params_reader() @@ -647,9 +640,6 @@ impl GlobalBarrierManager { self.state .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); - if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node).await; - } } // Checkpoint frequency changes. @@ -662,19 +652,14 @@ impl GlobalBarrierManager { .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } } - resp_result = self.control_stream_manager.next_response() => { - match resp_result { - Ok((worker_id, prev_epoch, resp)) => { - let resp: StreamingControlStreamResponse = resp; - match resp.response { - Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => { - self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp); - }, - resp => unreachable!("invalid response: {:?}", resp), - } - - } + // Barrier completes. + collect_result = self.rpc_manager.next_collected_barrier() => { + match collect_result.result { + Ok(resps) => { + self.checkpoint_control.barrier_collected(collect_result.prev_epoch, resps); + }, Err(e) => { + fail_point!("inject_barrier_err_success"); self.failure_recovery(e).await; } } @@ -698,9 +683,7 @@ impl GlobalBarrierManager { if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { - if let Err(e) = self.handle_new_barrier(scheduled) { - self.failure_recovery(e).await; - } + self.handle_new_barrier(scheduled); } } self.checkpoint_control.update_barrier_nums_metrics(); @@ -708,7 +691,7 @@ impl GlobalBarrierManager { } /// Handle the new barrier from the scheduled queue and inject it. - fn handle_new_barrier(&mut self, scheduled: Scheduled) -> MetaResult<()> { + fn handle_new_barrier(&mut self, scheduled: Scheduled) { let Scheduled { command, mut notifiers, @@ -745,12 +728,7 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let node_to_collect = self - .control_stream_manager - .inject_barrier(command_ctx.clone()) - .inspect_err(|_| { - fail_point!("inject_barrier_err_success"); - })?; + self.rpc_manager.inject_barrier(command_ctx.clone()); // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -768,12 +746,12 @@ impl GlobalBarrierManager { self.state.set_paused_reason(curr_paused_reason); // Record the in-flight barrier. self.checkpoint_control - .enqueue_command(command_ctx.clone(), notifiers, node_to_collect); - Ok(()) + .enqueue_command(command_ctx.clone(), notifiers); } async fn failure_recovery(&mut self, err: MetaError) { self.context.tracker.lock().await.abort_all(&err); + self.rpc_manager.clear(); self.checkpoint_control.clear_on_err(&err).await; if self.enable_recovery { @@ -809,8 +787,7 @@ impl GlobalBarrierManagerContext { state, .. } = node; - assert!(state.node_to_collect.is_empty()); - let resps = state.resps; + let resps = must_match!(state, BarrierEpochState::Collected(resps) => resps); let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { @@ -977,8 +954,13 @@ impl CheckpointControl { if matches!(&self.completing_command, CompletingCommand::None) { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + if let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() { let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a7ea3ae51665a..cd71f9eea707e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -29,9 +28,6 @@ use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; -use risingwave_pb::stream_service::{ - streaming_control_stream_response, StreamingControlStreamResponse, -}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -42,8 +38,6 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::rpc::ControlStreamManager; -use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; @@ -308,16 +302,15 @@ impl GlobalBarrierManagerContext { Ok(()) } +} +impl GlobalBarrierManager { /// Pre buffered drop and cancel command, return true if any. - async fn pre_apply_drop_cancel( - &self, - scheduled_barriers: &ScheduledBarriers, - ) -> MetaResult { - let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); + async fn pre_apply_drop_cancel(&self) -> MetaResult { + let (dropped_actors, cancelled) = self.scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.metadata_manager { + match &self.context.metadata_manager { MetadataManager::V1(mgr) => { let unregister_table_ids = mgr .fragment_manager @@ -341,9 +334,7 @@ impl GlobalBarrierManagerContext { } Ok(applied) } -} -impl GlobalBarrierManager { /// Recovery the whole cluster from the latest epoch. /// /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be @@ -384,14 +375,11 @@ impl GlobalBarrierManager { // get recovered. let recovery_timer = self.context.metrics.recovery_latency.start_timer(); - let new_state = tokio_retry::Retry::spawn(retry_strategy, || { + let (state, active_streaming_nodes) = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self - .context - .pre_apply_drop_cancel(&self.scheduled_barriers) - .await?; + let _ = self.pre_apply_drop_cancel().await?; let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( self.context.metadata_manager.clone(), @@ -439,21 +427,14 @@ impl GlobalBarrierManager { })? }; - let mut control_stream_manager = - ControlStreamManager::new(self.context.clone()); - - control_stream_manager - .reset(prev_epoch.value().0, active_streaming_nodes.current()) + // Reset all compute nodes, stop and drop existing actors. + self.reset_compute_nodes(&info, prev_epoch.value().0) .await .inspect_err(|err| { warn!(error = %err.as_report(), "reset compute nodes failed"); })?; - if self - .context - .pre_apply_drop_cancel(&self.scheduled_barriers) - .await? - { + if self.pre_apply_drop_cancel().await? { info = self .context .resolve_actor_info(all_nodes.clone()) @@ -464,10 +445,10 @@ impl GlobalBarrierManager { } // update and build all actors. - self.context.update_actors(&info).await.inspect_err(|err| { + self.update_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); })?; - self.context.build_actors(&info).await.inspect_err(|err| { + self.build_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "build_actors failed"); })?; @@ -497,25 +478,30 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let mut node_to_collect = - control_stream_manager.inject_barrier(command_ctx.clone())?; - while !node_to_collect.is_empty() { - let (worker_id, _, resp) = control_stream_manager.next_response().await?; - assert_matches!( - resp, - StreamingControlStreamResponse { - response: Some( - streaming_control_stream_response::Response::CompleteBarrier(_) - ) + let res = match self + .context + .inject_barrier(command_ctx.clone(), None, None) + .await + .result + { + Ok(response) => { + if let Err(err) = command_ctx.post_collect().await { + warn!(error = %err.as_report(), "post_collect failed"); + Err(err) + } else { + Ok((new_epoch.clone(), response)) } - ); - assert!(node_to_collect.remove(&worker_id)); - } + } + Err(err) => { + warn!(error = %err.as_report(), "inject_barrier failed"); + Err(err) + } + }; + let (new_epoch, _) = res?; ( BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), active_streaming_nodes, - control_stream_manager, ) }; if recovery_result.is_err() { @@ -531,17 +517,14 @@ impl GlobalBarrierManager { recovery_timer.observe_duration(); self.scheduled_barriers.mark_ready(); - ( - self.state, - self.active_streaming_nodes, - self.control_stream_manager, - ) = new_state; - tracing::info!( - epoch = self.state.in_flight_prev_epoch().value().0, - paused = ?self.state.paused_reason(), + epoch = state.in_flight_prev_epoch().value().0, + paused = ?state.paused_reason(), "recovery success" ); + + self.state = state; + self.active_streaming_nodes = active_streaming_nodes; } } @@ -1030,7 +1013,9 @@ impl GlobalBarrierManagerContext { new_plan.insert(self.env.meta_store_checked()).await?; Ok(new_plan) } +} +impl GlobalBarrierManager { /// Update all actors in compute nodes. async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { @@ -1056,7 +1041,7 @@ impl GlobalBarrierManagerContext { .flatten_ok() .try_collect()?; - let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self.context.metadata_manager.all_node_actors(false).await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { @@ -1070,7 +1055,8 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .broadcast_update_actor_info( &info.node_map, info.actor_map.keys().cloned(), @@ -1094,7 +1080,8 @@ impl GlobalBarrierManagerContext { return Ok(()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .build_actors( &info.node_map, info.actor_map.iter().map(|(node_id, actors)| { @@ -1106,6 +1093,23 @@ impl GlobalBarrierManagerContext { Ok(()) } + + /// Reset all compute nodes by calling `force_stop_actors`. + async fn reset_compute_nodes( + &self, + info: &InflightActorInfo, + prev_epoch: u64, + ) -> MetaResult<()> { + debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + self.context + .stream_rpc_manager + .force_stop_actors(info.node_map.values(), prev_epoch) + .await?; + + debug!(prev_epoch, "all compute nodes have been reset."); + + Ok(()) + } } #[cfg(test)] diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index aa35d606c4bbf..dfe9ada44a47e 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -12,15 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use std::future::Future; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::future::try_join_all; -use futures::stream::{BoxStream, FuturesUnordered}; +use futures::stream::FuturesUnordered; use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; @@ -29,193 +28,141 @@ use risingwave_common::util::tracing::TracingContext; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; use risingwave_pb::stream_service::{ - streaming_control_stream_request, streaming_control_stream_response, - BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest, - StreamingControlStreamRequest, StreamingControlStreamResponse, UpdateActorsRequest, + BarrierCompleteRequest, BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, + ForceStopActorsRequest, InjectBarrierRequest, UpdateActorsRequest, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; -use thiserror_ext::AsReport; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; use tokio::time::timeout; -use tracing::{error, info, warn}; +use tracing::Instrument; use uuid::Uuid; use super::command::CommandContext; -use super::GlobalBarrierManagerContext; +use super::{BarrierCollectResult, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::{MetaError, MetaResult}; -struct ControlStreamNode { - worker: WorkerNode, - sender: UnboundedSender, - // earlier epoch at the front - inflight_barriers: VecDeque>, -} - -fn into_future( - worker_id: WorkerId, - stream: BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, -) -> ResponseStreamFuture { - stream.into_future().map(move |(opt, stream)| { - ( - worker_id, - stream, - opt.ok_or_else(|| anyhow!("end of stream").into()) - .and_then(|result| result.map_err(|e| e.into())), - ) - }) -} +pub(super) struct BarrierRpcManager { + context: GlobalBarrierManagerContext, -type ResponseStreamFuture = impl Future< - Output = ( - WorkerId, - BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, - MetaResult, - ), - > + 'static; + /// Futures that await on the completion of barrier. + injected_in_progress_barrier: FuturesUnordered, -pub(super) struct ControlStreamManager { - context: GlobalBarrierManagerContext, - nodes: HashMap, - response_streams: FuturesUnordered, + prev_injecting_barrier: Option>, } -impl ControlStreamManager { +impl BarrierRpcManager { pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { Self { context, - nodes: Default::default(), - response_streams: FuturesUnordered::new(), + injected_in_progress_barrier: FuturesUnordered::new(), + prev_injecting_barrier: None, } } - pub(super) async fn add_worker(&mut self, node: WorkerNode) { - if self.nodes.contains_key(&node.id) { - warn!(id = node.id, host = ?node.host, "node already exists"); - return; - } - let prev_epoch = self - .context - .hummock_manager - .latest_snapshot() - .committed_epoch; - let node_id = node.id; - let node_host = node.host.clone().unwrap(); - match self.context.new_control_stream_node(node, prev_epoch).await { - Ok((stream_node, response_stream)) => { - let _ = self.nodes.insert(node_id, stream_node); - self.response_streams - .push(into_future(node_id, response_stream)); - info!(?node_host, "add control stream worker"); - } - Err(e) => { - error!(err = %e.as_report(), ?node_host, "fail to start control stream with worker node"); - } - } + pub(super) fn clear(&mut self) { + self.injected_in_progress_barrier = FuturesUnordered::new(); + self.prev_injecting_barrier = None; } - pub(super) async fn reset( - &mut self, - prev_epoch: u64, - nodes: &HashMap, - ) -> MetaResult<()> { - let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async { - let node = self - .context - .new_control_stream_node(node.clone(), prev_epoch) - .await?; - Result::<_, MetaError>::Ok((*worker_id, node)) - })) - .await?; - self.nodes.clear(); - self.response_streams.clear(); - for (worker_id, (node, response_stream)) in nodes { - self.nodes.insert(worker_id, node); - self.response_streams - .push(into_future(worker_id, response_stream)); - } + pub(super) fn inject_barrier(&mut self, command_context: Arc) { + // this is to notify that the barrier has been injected so that the next + // barrier can be injected to avoid out of order barrier injection. + // TODO: can be removed when bidi-stream control in implemented. + let (inject_tx, inject_rx) = oneshot::channel(); + let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); + let await_complete_future = + self.context + .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); + self.injected_in_progress_barrier + .push(await_complete_future); + } - Ok(()) + pub(super) async fn next_collected_barrier(&mut self) -> BarrierCollectResult { + pending_on_none(self.injected_in_progress_barrier.next()).await } +} + +pub(super) type BarrierCollectFuture = impl Future + Send + 'static; - pub(super) async fn next_response( - &mut self, - ) -> MetaResult<(WorkerId, u64, StreamingControlStreamResponse)> { - loop { - let (worker_id, response_stream, result) = - pending_on_none(self.response_streams.next()).await; +impl GlobalBarrierManagerContext { + /// Inject a barrier to all CNs and spawn a task to collect it + pub(super) fn inject_barrier( + &self, + command_context: Arc, + inject_tx: Option>, + prev_inject_rx: Option>, + ) -> BarrierCollectFuture { + let (tx, rx) = oneshot::channel(); + let prev_epoch = command_context.prev_epoch.value().0; + let stream_rpc_manager = self.stream_rpc_manager.clone(); + // todo: the collect handler should be abort when recovery. + let _join_handle = tokio::spawn(async move { + let span = command_context.span.clone(); + if let Some(prev_inject_rx) = prev_inject_rx { + if prev_inject_rx.await.is_err() { + let _ = tx.send(BarrierCollectResult { + prev_epoch, + result: Err(anyhow!("prev barrier failed to be injected").into()), + }); + return; + } + } + let result = stream_rpc_manager + .inject_barrier(command_context.clone()) + .instrument(span.clone()) + .await; match result { - Ok(resp) => match &resp.response { - Some(streaming_control_stream_response::Response::CompleteBarrier(_)) => { - self.response_streams - .push(into_future(worker_id, response_stream)); - let node = self - .nodes - .get_mut(&worker_id) - .expect("should exist when get collect resp"); - let command = node - .inflight_barriers - .pop_front() - .expect("should exist when get collect resp"); - break Ok((worker_id, command.prev_epoch.value().0, resp)); - } - resp => { - break Err(anyhow!("get unexpected resp: {:?}", resp).into()); - } - }, - Err(err) => { - let mut node = self - .nodes - .remove(&worker_id) - .expect("should exist when get collect resp"); - warn!(node = ?node.worker, err = ?err.as_report(), "get error from response stream"); - if let Some(command) = node.inflight_barriers.pop_front() { - self.context.report_collect_failure(&command, &err); - break Err(err); - } else { - // for node with no inflight barrier, simply ignore the error - continue; + Ok(node_need_collect) => { + if let Some(inject_tx) = inject_tx { + let _ = inject_tx.send(()); } + stream_rpc_manager + .collect_barrier(node_need_collect, command_context, tx) + .instrument(span.clone()) + .await; + } + Err(e) => { + let _ = tx.send(BarrierCollectResult { + prev_epoch, + result: Err(e), + }); } } - } + }); + rx.map(move |result| match result { + Ok(completion) => completion, + Err(_e) => BarrierCollectResult { + prev_epoch, + result: Err(anyhow!("failed to receive barrier completion result").into()), + }, + }) } } -impl ControlStreamManager { +impl StreamRpcManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. - pub(super) fn inject_barrier( - &mut self, + async fn inject_barrier( + &self, command_context: Arc, - ) -> MetaResult> { + ) -> MetaResult> { fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err")); - let mutation = command_context.to_mutation(); + let mutation = command_context.to_mutation().await?; let info = command_context.info.clone(); - let mut node_need_collect = HashSet::new(); - - info.node_map - .iter() - .map(|(node_id, worker_node)| { + let mut node_need_collect = HashMap::new(); + self.make_request( + info.node_map.iter().filter_map(|(node_id, node)| { let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); if actor_ids_to_collect.is_empty() { // No need to send or collect barrier for this node. assert!(actor_ids_to_send.is_empty()); - Ok(()) + node_need_collect.insert(*node_id, false); + None } else { - let Some(node) = self.nodes.get_mut(node_id) else { - return Err( - anyhow!("unconnected worker node: {:?}", worker_node.host).into() - ); - }; + node_need_collect.insert(*node_id, true); let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch { @@ -230,89 +177,104 @@ impl ControlStreamManager { kind: command_context.kind as i32, passed_actors: vec![], }; + Some(( + node, + InjectBarrierRequest { + request_id: Self::new_request_id(), + barrier: Some(barrier), + actor_ids_to_send, + actor_ids_to_collect, + }, + )) + } + }), + |client, request| { + async move { + tracing::debug!( + target: "events::meta::barrier::inject_barrier", + ?request, "inject barrier request" + ); - node.sender - .send(StreamingControlStreamRequest { - request: Some( - streaming_control_stream_request::Request::InjectBarrier( - InjectBarrierRequest { - request_id: StreamRpcManager::new_request_id(), - barrier: Some(barrier), - actor_ids_to_send, - actor_ids_to_collect, - }, - ), - ), - }) - .map_err(|_| { - MetaError::from(anyhow!( - "failed to send request to {} {:?}", - node.worker.id, - node.worker.host - )) - })?; - - node.inflight_barriers.push_back(command_context.clone()); - node_need_collect.insert(*node_id); - Result::<_, MetaError>::Ok(()) + // This RPC returns only if this worker node has injected this barrier. + client.inject_barrier(request).await } - }) - .try_collect() + }, + ) + .await + .inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventInjectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); + })?; + Ok(node_need_collect) + } + + /// Send barrier-complete-rpc and wait for responses from all CNs + async fn collect_barrier( + &self, + node_need_collect: HashMap, + command_context: Arc, + barrier_collect_tx: oneshot::Sender, + ) { + let prev_epoch = command_context.prev_epoch.value().0; + let tracing_context = + TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); + + let info = command_context.info.clone(); + let result = self + .broadcast( + info.node_map.iter().filter_map(|(node_id, node)| { + if !*node_need_collect.get(node_id).unwrap() { + // No need to send or collect barrier for this node. + None + } else { + Some(node) + } + }), + |client| { + let tracing_context = tracing_context.clone(); + async move { + let request = BarrierCompleteRequest { + request_id: Self::new_request_id(), + prev_epoch, + tracing_context, + }; + tracing::debug!( + target: "events::meta::barrier::barrier_complete", + ?request, "barrier complete" + ); + + // This RPC returns only if this worker node has collected this barrier. + client.barrier_complete(request).await + } + }, + ) + .await .inspect_err(|e| { // Record failure in event log. use risingwave_pb::meta::event_log; - let event = event_log::EventInjectBarrierFail { + use thiserror_ext::AsReport; + let event = event_log::EventCollectBarrierFail { prev_epoch: command_context.prev_epoch.value().0, cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(), }; - self.context - .env + self.env .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); - })?; - Ok(node_need_collect) - } -} - -impl GlobalBarrierManagerContext { - async fn new_control_stream_node( - &self, - node: WorkerNode, - prev_epoch: u64, - ) -> MetaResult<( - ControlStreamNode, - BoxStream<'static, risingwave_rpc_client::error::Result>, - )> { - let handle = self - .env - .stream_client_pool() - .get(&node) - .await? - .start_streaming_control(prev_epoch) - .await?; - Ok(( - ControlStreamNode { - worker: node.clone(), - sender: handle.request_sender, - inflight_barriers: VecDeque::new(), - }, - handle.response_stream, - )) - } - - /// Send barrier-complete-rpc and wait for responses from all CNs - fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) { - // Record failure in event log. - use risingwave_pb::meta::event_log; - let event = event_log::EventCollectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, - error: error.to_report_string(), - }; - self.env - .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + }) + .map_err(Into::into); + let _ = barrier_collect_tx + .send(BarrierCollectResult { prev_epoch, result }) + .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } } @@ -341,6 +303,15 @@ impl StreamRpcManager { result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err)) } + async fn broadcast> + 'static>( + &self, + nodes: impl Iterator, + f: impl Fn(StreamClient) -> Fut, + ) -> MetaResult> { + self.make_request(nodes.map(|node| (node, ())), |client, ()| f(client)) + .await + } + fn new_request_id() -> String { Uuid::new_v4().to_string() } @@ -432,6 +403,23 @@ impl StreamRpcManager { .await?; Ok(()) } + + pub async fn force_stop_actors( + &self, + nodes: impl Iterator, + prev_epoch: u64, + ) -> MetaResult<()> { + self.broadcast(nodes, |client| async move { + client + .force_stop_actors(ForceStopActorsRequest { + request_id: Self::new_request_id(), + prev_epoch, + }) + .await + }) + .await?; + Ok(()) + } } /// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`. @@ -478,6 +466,8 @@ fn merge_node_rpc_errors( ) -> MetaError { use std::fmt::Write; + use thiserror_ext::AsReport; + let concat: String = errors .into_iter() .fold(format!("{message}:"), |mut s, (w, e)| { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 99ae32d26bb92..41ed041879f0c 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use futures::future::{try_join_all, BoxFuture}; +use futures::future::BoxFuture; use itertools::Itertools; use num_integer::Integer; use num_traits::abs; @@ -2651,33 +2651,9 @@ impl GlobalStreamManager { tracing::debug!("reschedule plan: {:?}", reschedule_fragment); - let up_down_stream_fragment: HashSet<_> = reschedule_fragment - .iter() - .flat_map(|(_, reschedule)| { - reschedule - .upstream_fragment_dispatcher_ids - .iter() - .map(|(fragment_id, _)| *fragment_id) - .chain(reschedule.downstream_fragment_ids.iter().cloned()) - }) - .collect(); - - let fragment_actors = - try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async { - let actor_ids = self - .metadata_manager - .get_running_actors_of_fragment(*fragment_id) - .await?; - Result::<_, MetaError>::Ok((*fragment_id, actor_ids)) - })) - .await? - .into_iter() - .collect(); - let command = Command::RescheduleFragment { reschedules: reschedule_fragment, table_parallelism: table_parallelism.unwrap_or_default(), - fragment_actors, }; match &self.metadata_manager { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index fa16b039236b6..d6ef8944725e4 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -756,7 +756,6 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; - use futures::{Stream, TryStreamExt}; use risingwave_common::catalog::TableId; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::reader::SystemParamsRead; @@ -769,20 +768,16 @@ mod tests { use risingwave_pb::stream_service::stream_service_server::{ StreamService, StreamServiceServer, }; - use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::{ BroadcastActorInfoTableResponse, BuildActorsResponse, DropActorsRequest, - DropActorsResponse, UpdateActorsResponse, *, + DropActorsResponse, InjectBarrierRequest, InjectBarrierResponse, UpdateActorsResponse, *, }; - use tokio::spawn; - use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::Sender; #[cfg(feature = "failpoints")] use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio::time::sleep; - use tokio_stream::wrappers::UnboundedReceiverStream; - use tonic::{Request, Response, Status, Streaming}; + use tonic::{Request, Response, Status}; use super::*; use crate::barrier::{GlobalBarrierManager, StreamRpcManager}; @@ -810,9 +805,6 @@ mod tests { #[async_trait::async_trait] impl StreamService for FakeStreamService { - type StreamingControlStreamStream = - impl Stream>; - async fn update_actors( &self, request: Request, @@ -864,46 +856,29 @@ mod tests { Ok(Response::new(DropActorsResponse::default())) } - async fn streaming_control_stream( + async fn force_stop_actors( &self, - request: Request>, - ) -> Result, Status> { - let (tx, rx) = unbounded_channel(); - let mut request_stream = request.into_inner(); - let inner = self.inner.clone(); - let _join_handle = spawn(async move { - while let Ok(Some(request)) = request_stream.try_next().await { - match request.request.unwrap() { - streaming_control_stream_request::Request::Init(_) => { - inner.actor_streams.lock().unwrap().clear(); - inner.actor_ids.lock().unwrap().clear(); - inner.actor_infos.lock().unwrap().clear(); - let _ = tx.send(Ok(StreamingControlStreamResponse { - response: Some(streaming_control_stream_response::Response::Init( - InitResponse {}, - )), - })); - } - streaming_control_stream_request::Request::InjectBarrier(_) => { - let _ = tx.send(Ok(StreamingControlStreamResponse { - response: Some( - streaming_control_stream_response::Response::CompleteBarrier( - BarrierCompleteResponse { - request_id: "".to_string(), - status: None, - create_mview_progress: vec![], - synced_sstables: vec![], - worker_id: 0, - table_watermarks: Default::default(), - }, - ), - ), - })); - } - } - } - }); - Ok(Response::new(UnboundedReceiverStream::new(rx))) + _request: Request, + ) -> std::result::Result, Status> { + self.inner.actor_streams.lock().unwrap().clear(); + self.inner.actor_ids.lock().unwrap().clear(); + self.inner.actor_infos.lock().unwrap().clear(); + + Ok(Response::new(ForceStopActorsResponse::default())) + } + + async fn inject_barrier( + &self, + _request: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(InjectBarrierResponse::default())) + } + + async fn barrier_complete( + &self, + _request: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(BarrierCompleteResponse::default())) } async fn wait_epoch_commit( diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index fabd1dabeca01..0485465499f5a 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -43,9 +43,7 @@ use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::heartbeat_request::extra_info; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; pub mod error; use error::Result; @@ -65,9 +63,7 @@ pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; use rw_futures_util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; -pub use stream_client::{ - StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle, -}; +pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; #[async_trait] pub trait RpcClient: Send + Sync + 'static + Clone { @@ -278,63 +274,3 @@ impl BidiStreamHandle { } } } - -/// The handle of a bidi-stream started from the rpc client. It is similar to the `BidiStreamHandle` -/// except that its sender is unbounded. -pub struct UnboundedBidiStreamHandle { - pub request_sender: UnboundedSender, - pub response_stream: BoxStream<'static, Result>, -} - -impl Debug for UnboundedBidiStreamHandle { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(type_name::()) - } -} - -impl UnboundedBidiStreamHandle { - pub async fn initialize< - F: FnOnce(UnboundedReceiver) -> Fut, - St: Stream> + Send + Unpin + 'static, - Fut: Future> + Send, - R: Into, - >( - first_request: R, - init_stream_fn: F, - ) -> Result<(Self, RSP)> { - let (request_sender, request_receiver) = unbounded_channel(); - - // Send initial request in case of the blocking receive call from creating streaming request - request_sender - .send(first_request.into()) - .map_err(|_err| anyhow!("unable to send first request of {}", type_name::()))?; - - let mut response_stream = init_stream_fn(request_receiver).await?; - - let first_response = response_stream - .next() - .await - .context("get empty response from first request")??; - - Ok(( - Self { - request_sender, - response_stream: response_stream.boxed(), - }, - first_response, - )) - } - - pub async fn next_response(&mut self) -> Result { - self.response_stream - .next() - .await - .ok_or_else(|| anyhow!("end of response stream"))? - } - - pub fn send_request(&mut self, request: REQ) -> Result<()> { - self.request_sender - .send(request) - .map_err(|_| anyhow!("unable to send request {}", type_name::()).into()) - } -} diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index ae5af65f28220..3a271b5660bbd 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -15,22 +15,17 @@ use std::sync::Arc; use std::time::Duration; -use anyhow::anyhow; use async_trait::async_trait; -use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; -use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; -use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::*; -use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Endpoint; -use crate::error::{Result, RpcError}; +use crate::error::Result; use crate::tracing::{Channel, TracingInjectedChannelExt}; -use crate::{rpc_client_method_impl, RpcClient, RpcClientPool, UnboundedBidiStreamHandle}; +use crate::{rpc_client_method_impl, RpcClient, RpcClientPool}; #[derive(Clone)] pub struct StreamClient(StreamServiceClient); @@ -73,6 +68,9 @@ macro_rules! for_all_stream_rpc { ,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse } ,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse } ,{ 0, drop_actors, DropActorsRequest, DropActorsResponse } + ,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse} + ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } + ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; @@ -81,35 +79,3 @@ macro_rules! for_all_stream_rpc { impl StreamClient { for_all_stream_rpc! { rpc_client_method_impl } } - -pub type StreamingControlHandle = - UnboundedBidiStreamHandle; - -impl StreamClient { - pub async fn start_streaming_control(&self, prev_epoch: u64) -> Result { - let first_request = StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::Init( - InitRequest { prev_epoch }, - )), - }; - let mut client = self.0.to_owned(); - let (handle, first_rsp) = - UnboundedBidiStreamHandle::initialize(first_request, |rx| async move { - client - .streaming_control_stream(UnboundedReceiverStream::new(rx)) - .await - .map(|response| response.into_inner().map_err(RpcError::from)) - .map_err(RpcError::from) - }) - .await?; - match first_rsp { - StreamingControlStreamResponse { - response: Some(streaming_control_stream_response::Response::Init(InitResponse {})), - } => {} - other => { - return Err(anyhow!("expect InitResponse but get {:?}", other).into()); - } - }; - Ok(handle) - } -} diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 096020ff569f6..0a1def6c1f7f5 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,10 +635,8 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self, prev_epoch: u64) { - for (key, _) in self.inner.range((Unbounded, Unbounded), None).unwrap() { - assert!(key.epoch_with_gap.pure_epoch() <= prev_epoch); - } + async fn clear_shared_buffer(&self, _prev_epoch: u64) { + unimplemented!("recovery not supported") } #[allow(clippy::unused_async)] diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 6fef59b6740d1..edbd660690049 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,25 +13,20 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::future::pending; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use futures::stream::{BoxStream, FuturesUnordered}; +use futures::stream::FuturesUnordered; use futures::StreamExt; -use itertools::Itertools; use parking_lot::Mutex; -use risingwave_pb::stream_service::barrier_complete_response::{ - GroupedSstableInfo, PbCreateMviewProgress, -}; +use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use tonic::Status; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; @@ -46,17 +41,9 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_common::util::runtime::BackgroundShutdownRuntime; -use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; -use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; -use risingwave_pb::stream_service::{ - streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamRequest, - StreamingControlStreamResponse, -}; use risingwave_storage::store::SyncResult; use crate::executor::exchange::permit::Receiver; @@ -78,71 +65,6 @@ pub struct BarrierCompleteResult { pub create_mview_progress: Vec, } -pub(super) struct ControlStreamHandle { - #[expect(clippy::type_complexity)] - pair: Option<( - UnboundedSender>, - BoxStream<'static, Result>, - )>, -} - -impl ControlStreamHandle { - fn empty() -> Self { - Self { pair: None } - } - - pub(super) fn new( - sender: UnboundedSender>, - request_stream: BoxStream<'static, Result>, - ) -> Self { - Self { - pair: Some((sender, request_stream)), - } - } - - fn reset_stream_with_err(&mut self, err: Status) { - if let Some((sender, _)) = self.pair.take() { - warn!("control stream reset with: {:?}", err.as_report()); - if sender.send(Err(err)).is_err() { - warn!("failed to notify finish of control stream"); - } - } - } - - fn inspect_result(&mut self, result: StreamResult<()>) { - if let Err(e) = result { - self.reset_stream_with_err(Status::internal(format!("get error: {:?}", e.as_report()))); - } - } - - fn send_response(&mut self, response: StreamingControlStreamResponse) { - if let Some((sender, _)) = self.pair.as_ref() { - if sender.send(Ok(response)).is_err() { - self.pair = None; - warn!("fail to send response. control stream reset"); - } - } else { - debug!(?response, "control stream has been reset. ignore response"); - } - } - - async fn next_request(&mut self) -> StreamingControlStreamRequest { - if let Some((_, stream)) = &mut self.pair { - match stream.next().await { - Some(Ok(request)) => { - return request; - } - Some(Err(e)) => self.reset_stream_with_err(Status::internal(format!( - "failed to get request: {:?}", - e.as_report() - ))), - None => self.reset_stream_with_err(Status::internal("end of stream")), - } - } - pending().await - } -} - pub(super) enum LocalBarrierEvent { RegisterSender { actor_id: ActorId, @@ -162,9 +84,19 @@ pub(super) enum LocalBarrierEvent { } pub(super) enum LocalActorOperation { - NewControlStream { - handle: ControlStreamHandle, - init_request: InitRequest, + InjectBarrier { + barrier: Barrier, + actor_ids_to_send: HashSet, + actor_ids_to_collect: HashSet, + result_sender: oneshot::Sender>, + }, + Reset { + prev_epoch: u64, + result_sender: oneshot::Sender<()>, + }, + AwaitEpochCompleted { + epoch: u64, + result_sender: oneshot::Sender>, }, DropActors { actors: Vec, @@ -262,7 +194,7 @@ pub(super) struct LocalBarrierWorker { /// Record all unexpected exited actors. failure_actors: HashMap, - control_stream_handle: ControlStreamHandle, + epoch_result_sender: HashMap>>, pub(super) actor_manager: Arc, @@ -296,7 +228,7 @@ impl LocalBarrierWorker { actor_manager.env.state_store(), actor_manager.streaming_metrics.clone(), ), - control_stream_handle: ControlStreamHandle::empty(), + epoch_result_sender: HashMap::default(), actor_manager, actor_manager_state: StreamActorManagerState::new(), current_shared_context: shared_context, @@ -314,8 +246,7 @@ impl LocalBarrierWorker { self.handle_actor_created(sender, create_actors_result); } completed_epoch = self.state.next_completed_epoch() => { - let result = self.on_epoch_completed(completed_epoch); - self.control_stream_handle.inspect_result(result); + self.on_epoch_completed(completed_epoch); }, // Note: it's important to select in a biased way to ensure that // barrier event is handled before actor_op, because we must ensure @@ -330,13 +261,10 @@ impl LocalBarrierWorker { actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { match actor_op { - LocalActorOperation::NewControlStream { handle, init_request } => { - self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); - self.reset(init_request.prev_epoch).await; - self.control_stream_handle = handle; - self.control_stream_handle.send_response(StreamingControlStreamResponse { - response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) - }); + LocalActorOperation::Reset { + result_sender, prev_epoch} => { + self.reset(prev_epoch).await; + let _ = result_sender.send(()); } actor_op => { self.handle_actor_op(actor_op); @@ -346,11 +274,7 @@ impl LocalBarrierWorker { else { break; } - }, - request = self.control_stream_handle.next_request() => { - let result = self.handle_streaming_control_request(request); - self.control_stream_handle.inspect_result(result); - }, + } } } } @@ -367,26 +291,6 @@ impl LocalBarrierWorker { let _ = sender.send(result); } - fn handle_streaming_control_request( - &mut self, - request: StreamingControlStreamRequest, - ) -> StreamResult<()> { - match request.request.expect("should not be empty") { - Request::InjectBarrier(req) => { - let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; - self.send_barrier( - &barrier, - req.actor_ids_to_send.into_iter().collect(), - req.actor_ids_to_collect.into_iter().collect(), - )?; - Ok(()) - } - Request::Init(_) => { - unreachable!() - } - } - } - fn handle_barrier_event(&mut self, event: LocalBarrierEvent) { match event { LocalBarrierEvent::RegisterSender { actor_id, sender } => { @@ -409,8 +313,26 @@ impl LocalBarrierWorker { fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { - LocalActorOperation::NewControlStream { .. } => { - unreachable!("NewControlStream event should be handled separately in async context") + LocalActorOperation::InjectBarrier { + barrier, + actor_ids_to_send, + actor_ids_to_collect, + result_sender, + } => { + let result = self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); + let _ = result_sender.send(result).inspect_err(|e| { + warn!(err=?e, "fail to send inject barrier result"); + }); + } + LocalActorOperation::Reset { .. } => { + unreachable!("Reset event should be handled separately in async context") + } + + LocalActorOperation::AwaitEpochCompleted { + epoch, + result_sender, + } => { + self.await_epoch_completed(epoch, result_sender); } LocalActorOperation::DropActors { actors, @@ -449,55 +371,17 @@ impl LocalBarrierWorker { // event handler impl LocalBarrierWorker { - fn on_epoch_completed(&mut self, epoch: u64) -> StreamResult<()> { - let result = self - .state - .pop_completed_epoch(epoch) - .expect("should exist") - .expect("should have completed")?; - - let BarrierCompleteResult { - create_mview_progress, - sync_result, - } = result; - - let (synced_sstables, table_watermarks) = sync_result - .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) - .unwrap_or_default(); - - let result = StreamingControlStreamResponse { - response: Some( - streaming_control_stream_response::Response::CompleteBarrier( - BarrierCompleteResponse { - request_id: "todo".to_string(), - status: None, - create_mview_progress, - synced_sstables: synced_sstables - .into_iter() - .map( - |LocalSstableInfo { - compaction_group_id, - sst_info, - table_stats, - }| GroupedSstableInfo { - compaction_group_id, - sst: Some(sst_info), - table_stats_map: to_prost_table_stats_map(table_stats), - }, - ) - .collect_vec(), - worker_id: self.actor_manager.env.worker_id(), - table_watermarks: table_watermarks - .into_iter() - .map(|(key, value)| (key.table_id, value.to_protobuf())) - .collect(), - }, - ), - ), - }; - - self.control_stream_handle.send_response(result); - Ok(()) + fn on_epoch_completed(&mut self, epoch: u64) { + if let Some(sender) = self.epoch_result_sender.remove(&epoch) { + let result = self + .state + .pop_completed_epoch(epoch) + .expect("should exist") + .expect("should have completed"); + if sender.send(result).is_err() { + warn!(epoch, "fail to send epoch complete result"); + } + } } /// Register sender for source actors, used to send barriers. @@ -523,6 +407,7 @@ impl LocalBarrierWorker { ) -> StreamResult<()> { #[cfg(not(test))] { + use itertools::Itertools; // The barrier might be outdated and been injected after recovery in some certain extreme // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during // recovery. Check it here and return an error here if some actors are not found to @@ -607,6 +492,36 @@ impl LocalBarrierWorker { Ok(()) } + /// Use `prev_epoch` to remove collect rx and return rx. + fn await_epoch_completed( + &mut self, + prev_epoch: u64, + result_sender: oneshot::Sender>, + ) { + match self.state.pop_completed_epoch(prev_epoch) { + Err(e) => { + let _ = result_sender.send(Err(e)); + } + Ok(Some(result)) => { + if result_sender.send(result).is_err() { + warn!(prev_epoch, "failed to send completed epoch result"); + } + } + Ok(None) => { + if let Some(prev_sender) = + self.epoch_result_sender.insert(prev_epoch, result_sender) + { + warn!(?prev_epoch, "duplicate await_collect_barrier on epoch"); + let _ = prev_sender.send(Err(anyhow!( + "duplicate await_collect_barrier on epoch {}", + prev_epoch + ) + .into())); + } + } + } + } + /// Reset all internal states. pub(super) fn reset_state(&mut self) { *self = Self::new(self.actor_manager.clone()); @@ -623,14 +538,12 @@ impl LocalBarrierWorker { async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { self.add_failure(actor_id, err.clone()); let root_err = self.try_find_root_failure(err).await; - let failed_epochs = self.state.epochs_await_on_actor(actor_id).collect_vec(); - if !failed_epochs.is_empty() { - self.control_stream_handle - .reset_stream_with_err(Status::internal(format!( - "failed to collect barrier. epoch: {:?}, err: {:?}", - failed_epochs, - root_err.as_report() - ))); + for fail_epoch in self.state.epochs_await_on_actor(actor_id) { + if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { + if result_sender.send(Err(root_err.clone())).is_err() { + warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure"); + } + } } } @@ -735,7 +648,40 @@ impl LocalBarrierManager { pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { self.send_event(LocalBarrierEvent::RegisterSender { actor_id, sender }); } +} +impl EventSender { + /// Broadcast a barrier to all senders. Save a receiver which will get notified when this + /// barrier is finished, in managed mode. + pub(super) async fn send_barrier( + &self, + barrier: Barrier, + actor_ids_to_send: impl IntoIterator, + actor_ids_to_collect: impl IntoIterator, + ) -> StreamResult<()> { + self.send_and_await(move |result_sender| LocalActorOperation::InjectBarrier { + barrier, + actor_ids_to_send: actor_ids_to_send.into_iter().collect(), + actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), + result_sender, + }) + .await? + } + + /// Use `prev_epoch` to remove collect rx and return rx. + pub(super) async fn await_epoch_completed( + &self, + prev_epoch: u64, + ) -> StreamResult { + self.send_and_await(|result_sender| LocalActorOperation::AwaitEpochCompleted { + epoch: prev_epoch, + result_sender, + }) + .await? + } +} + +impl LocalBarrierManager { /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { @@ -781,7 +727,7 @@ pub fn try_find_root_actor_failure<'a>( #[cfg(test)] impl LocalBarrierManager { - pub(super) fn spawn_for_test() -> EventSender { + pub(super) async fn spawn_for_test() -> (EventSender, Self) { use std::sync::atomic::AtomicU64; let (tx, rx) = unbounded_channel(); let _join_handle = LocalBarrierWorker::spawn( @@ -791,7 +737,13 @@ impl LocalBarrierManager { Arc::new(AtomicU64::new(0)), rx, ); - EventSender(tx) + let sender = EventSender(tx); + let context = sender + .send_and_await(LocalActorOperation::GetCurrentSharedContext) + .await + .unwrap(); + + (sender, context.local_barrier_manager.clone()) } pub fn for_test() -> Self { diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 2ec421661a14a..ae248facc0e5f 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -17,43 +17,15 @@ use std::iter::once; use std::pin::pin; use std::task::Poll; -use assert_matches::assert_matches; -use futures::future::join_all; -use futures::FutureExt; use itertools::Itertools; use risingwave_common::util::epoch::test_epoch; -use risingwave_pb::stream_service::{streaming_control_stream_request, InjectBarrierRequest}; use tokio::sync::mpsc::unbounded_channel; -use tokio_stream::wrappers::UnboundedReceiverStream; use super::*; #[tokio::test] async fn test_managed_barrier_collection() -> StreamResult<()> { - let actor_op_tx = LocalBarrierManager::spawn_for_test(); - - let (request_tx, request_rx) = unbounded_channel(); - let (response_tx, mut response_rx) = unbounded_channel(); - - actor_op_tx.send_event(LocalActorOperation::NewControlStream { - handle: ControlStreamHandle::new( - response_tx, - UnboundedReceiverStream::new(request_rx).boxed(), - ), - init_request: InitRequest { prev_epoch: 0 }, - }); - - assert_matches!( - response_rx.recv().await.unwrap().unwrap().response.unwrap(), - streaming_control_stream_response::Response::Init(_) - ); - - let context = actor_op_tx - .send_and_await(LocalActorOperation::GetCurrentSharedContext) - .await - .unwrap(); - - let manager = &context.local_barrier_manager; + let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await; let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -75,35 +47,21 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - request_tx - .send(Ok(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::InjectBarrier( - InjectBarrierRequest { - request_id: "".to_string(), - barrier: Some(barrier.to_protobuf()), - actor_ids_to_send: actor_ids.clone(), - actor_ids_to_collect: actor_ids, - }, - )), - })) + actor_op_tx + .send_barrier(barrier.clone(), actor_ids.clone(), actor_ids) + .await .unwrap(); - // Collect barriers from actors - let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { - let barrier = rx.recv().await.unwrap(); - assert_eq!(barrier.epoch.prev, epoch); - (*actor_id, barrier) - })) - .await; - - let mut await_epoch_future = pin!(response_rx.recv().map(|result| { - let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); - let resp = resp.response.unwrap(); - match resp { - streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} - _ => unreachable!(), - } - })); + let collected_barriers = rxs + .iter_mut() + .map(|(actor_id, rx)| { + let barrier = rx.try_recv().unwrap(); + assert_eq!(barrier.epoch.prev, epoch); + (*actor_id, barrier) + }) + .collect_vec(); + + let mut await_epoch_future = pin!(actor_op_tx.await_epoch_completed(epoch)); // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { @@ -119,30 +77,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { #[tokio::test] async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> { - let actor_op_tx = LocalBarrierManager::spawn_for_test(); - - let (request_tx, request_rx) = unbounded_channel(); - let (response_tx, mut response_rx) = unbounded_channel(); - - actor_op_tx.send_event(LocalActorOperation::NewControlStream { - handle: ControlStreamHandle::new( - response_tx, - UnboundedReceiverStream::new(request_rx).boxed(), - ), - init_request: InitRequest { prev_epoch: 0 }, - }); - - assert_matches!( - response_rx.recv().await.unwrap().unwrap().response.unwrap(), - streaming_control_stream_response::Response::Init(_) - ); - - let context = actor_op_tx - .send_and_await(LocalActorOperation::GetCurrentSharedContext) - .await - .unwrap(); - - let manager = &context.local_barrier_manager; + let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await; let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -174,35 +109,23 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Collect a barrier before sending manager.collect(extra_actor_id, &barrier); - request_tx - .send(Ok(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::InjectBarrier( - InjectBarrierRequest { - request_id: "".to_string(), - barrier: Some(barrier.to_protobuf()), - actor_ids_to_send, - actor_ids_to_collect, - }, - )), - })) + // Send the barrier to all actors + actor_op_tx + .send_barrier(barrier.clone(), actor_ids_to_send, actor_ids_to_collect) + .await .unwrap(); // Collect barriers from actors - let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { - let barrier = rx.recv().await.unwrap(); - assert_eq!(barrier.epoch.prev, epoch); - (*actor_id, barrier) - })) - .await; - - let mut await_epoch_future = pin!(response_rx.recv().map(|result| { - let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); - let resp = resp.response.unwrap(); - match resp { - streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} - _ => unreachable!(), - } - })); + let collected_barriers = rxs + .iter_mut() + .map(|(actor_id, rx)| { + let barrier = rx.try_recv().unwrap(); + assert_eq!(barrier.epoch.prev, epoch); + (*actor_id, barrier) + }) + .collect_vec(); + + let mut await_epoch_future = pin!(actor_op_tx.await_epoch_completed(epoch)); // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 9045adc1263ce..602a6f49e2385 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; -use futures::stream::BoxStream; use futures::FutureExt; use itertools::Itertools; use parking_lot::Mutex; @@ -34,32 +33,25 @@ use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode}; -use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; -use risingwave_pb::stream_service::{ - StreamingControlStreamRequest, StreamingControlStreamResponse, -}; use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; use rw_futures_util::AttachedFuture; use thiserror_ext::AsReport; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use tonic::Status; -use super::{unique_executor_id, unique_operator_id}; +use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; use crate::error::StreamResult; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::{ - Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Executor, ExecutorInfo, - WrapperExecutor, + Actor, ActorContext, ActorContextRef, Barrier, DispatchExecutor, DispatcherImpl, Executor, + ExecutorInfo, WrapperExecutor, }; use crate::from_proto::create_executor; -use crate::task::barrier_manager::{ - ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, -}; +use crate::task::barrier_manager::{EventSender, LocalActorOperation, LocalBarrierWorker}; use crate::task::{ ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, StreamActorManagerState, StreamEnvironment, UpDownActorIds, @@ -207,19 +199,22 @@ impl LocalStreamManager { } } - /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream - /// to receive control message from meta - pub fn handle_new_control_stream( + /// Broadcast a barrier to all senders. Save a receiver in barrier manager + pub async fn send_barrier( &self, - sender: UnboundedSender>, - request_stream: BoxStream<'static, Result>, - init_request: InitRequest, - ) { + barrier: Barrier, + actor_ids_to_send: impl IntoIterator, + actor_ids_to_collect: impl IntoIterator, + ) -> StreamResult<()> { self.actor_op_tx - .send_event(LocalActorOperation::NewControlStream { - handle: ControlStreamHandle::new(sender, request_stream), - init_request, - }) + .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) + .await + } + + /// Use `epoch` to find collect rx. And wait for all actor to be collected before + /// returning. + pub async fn collect_barrier(&self, prev_epoch: u64) -> StreamResult { + self.actor_op_tx.await_epoch_completed(prev_epoch).await } /// Drop the resources of the given actors. @@ -232,6 +227,17 @@ impl LocalStreamManager { .await } + /// Force stop all actors on this worker, and then drop their resources. + pub async fn reset(&self, prev_epoch: u64) { + self.actor_op_tx + .send_and_await(|result_sender| LocalActorOperation::Reset { + result_sender, + prev_epoch, + }) + .await + .expect("should receive reset") + } + pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { self.actor_op_tx .send_and_await(|result_sender| LocalActorOperation::UpdateActors { diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 847c7c60c7cd2..d44d69219bbaf 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -26,9 +26,6 @@ use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; use crate::utils::TimedExt; -// retry a maximum times until it succeed -const MAX_RETRY: usize = 5; - fn is_create_table_as(sql: &str) -> bool { let parts: Vec = sql.split_whitespace().map(|s| s.to_lowercase()).collect(); @@ -274,13 +271,10 @@ pub async fn run_slt_task( }) .await { - let err_string = err.to_string(); // cluster could be still under recovering if killed before, retry if // meets `no reader for dml in table with id {}`. - let should_retry = (err_string.contains("no reader for dml in table") - || err_string - .contains("error reading a body from connection: broken pipe")) - || err_string.contains("failed to inject barrier") && i < MAX_RETRY; + let should_retry = + err.to_string().contains("no reader for dml in table") && i < 5; if !should_retry { panic!("{}", err); } @@ -308,6 +302,8 @@ pub async fn run_slt_task( None }; + // retry up to 5 times until it succeed + let max_retry = 5; for i in 0usize.. { tracing::debug!(iteration = i, "retry count"); let delay = Duration::from_secs(1 << i); @@ -352,7 +348,7 @@ pub async fn run_slt_task( ?err, "failed to wait for background mv to finish creating" ); - if i >= MAX_RETRY { + if i >= max_retry { panic!("failed to run test after retry {i} times, error={err:#?}"); } continue; @@ -383,8 +379,8 @@ pub async fn run_slt_task( break } - // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created. - _ if i >= MAX_RETRY => { + // Keep i >= max_retry for other errors. Since these errors indicate that the MV might not yet be created. + _ if i >= max_retry => { panic!("failed to run test after retry {i} times: {e}") } SqlCmd::CreateMaterializedView { ref name } @@ -408,7 +404,7 @@ pub async fn run_slt_task( ?err, "failed to wait for background mv to finish creating" ); - if i >= MAX_RETRY { + if i >= max_retry { panic!("failed to run test after retry {i} times, error={err:#?}"); } continue;