diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 54ffc3d5ff79..ce727ba9cc55 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -17,16 +17,6 @@ message BuildActorInfo { map related_subscriptions = 2; } -message DropActorsRequest { - string request_id = 1; - repeated uint32 actor_ids = 2; -} - -message DropActorsResponse { - string request_id = 1; - common.Status status = 2; -} - message InjectBarrierRequest { string request_id = 1; stream_plan.Barrier barrier = 2; @@ -109,7 +99,6 @@ message StreamingControlStreamResponse { } service StreamService { - rpc DropActors(DropActorsRequest) returns (DropActorsResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); } diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index eb055a174b3e..6253cfe74c73 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -40,20 +40,6 @@ impl StreamService for StreamServiceImpl { type StreamingControlStreamStream = impl Stream>; - #[cfg_attr(coverage, coverage(off))] - async fn drop_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let actors = req.actor_ids; - self.mgr.drop_actors(actors).await?; - Ok(Response::new(DropActorsResponse { - request_id: req.request_id, - status: None, - })) - } - #[cfg_attr(coverage, coverage(off))] async fn wait_epoch_commit( &self, diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 1f0f7f6a3fe8..73d3089eacd9 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -27,7 +27,6 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common_service::{MetricsManager, TracingExtractLayer}; -use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::{ @@ -550,12 +549,9 @@ pub async fn start_service_as_election_leader( // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks. let mut sub_tasks = vec![shutdown_handle]; - let stream_rpc_manager = StreamRpcManager::new(env.clone()); - let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), - stream_rpc_manager.clone(), env.clone(), )); @@ -567,7 +563,6 @@ pub async fn start_service_as_election_leader( source_manager.clone(), sink_manager.clone(), meta_metrics.clone(), - stream_rpc_manager.clone(), scale_controller.clone(), ) .await; @@ -585,7 +580,6 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), barrier_scheduler.clone(), source_manager.clone(), - stream_rpc_manager, scale_controller.clone(), ) .unwrap(), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6e4ebe40b93b..2e315ed96b14 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; use futures::future::try_join_all; -use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; @@ -959,19 +958,6 @@ impl Command { } impl CommandContext { - /// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands. - async fn clean_up(&self, actors: Vec) -> MetaResult<()> { - self.barrier_manager_context - .stream_rpc_manager - .drop_actors( - &self.node_map, - self.node_map - .keys() - .map(|worker_id| (*worker_id, actors.clone())), - ) - .await - } - pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { let futures = self.node_map.values().map(|worker_node| async { let client = self @@ -1021,13 +1007,9 @@ impl CommandContext { } Command::DropStreamingJobs { - actors, unregistered_state_table_ids, .. } => { - // Tell compute nodes to drop actors. - self.clean_up(actors.clone()).await?; - self.barrier_manager_context .hummock_manager .unregister_table_ids(unregistered_state_table_ids.iter().cloned()) @@ -1036,7 +1018,6 @@ impl CommandContext { Command::CancelStreamingJob(table_fragments) => { tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job"); - self.clean_up(table_fragments.actor_ids()).await?; // NOTE(kwannoel): At this point, meta has already registered the table ids. // We should unregister them. @@ -1136,8 +1117,6 @@ impl CommandContext { .. }) = job_type { - self.clean_up(old_table_fragments.actor_ids()).await?; - // Drop fragment info in meta store. mgr.fragment_manager .post_replace_table( @@ -1164,13 +1143,9 @@ impl CommandContext { new_table_fragments, dispatchers, init_split_assignment, - old_table_fragments, .. }) = job_type { - // Tell compute nodes to drop actors. - self.clean_up(old_table_fragments.actor_ids()).await?; - mgr.catalog_controller .post_collect_table_fragments( new_table_fragments.table_id().table_id as _, @@ -1201,11 +1176,6 @@ impl CommandContext { table_parallelism, .. } => { - let removed_actors = reschedules - .values() - .flat_map(|reschedule| reschedule.removed_actors.clone().into_iter()) - .collect_vec(); - self.clean_up(removed_actors).await?; self.barrier_manager_context .scale_controller .post_apply_reschedule(reschedules, table_parallelism) @@ -1220,8 +1190,6 @@ impl CommandContext { init_split_assignment, .. }) => { - self.clean_up(old_table_fragments.actor_ids()).await?; - match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { // Drop fragment info in meta store. diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index daa82306bff6..e4b9cdb8b3a9 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -86,7 +86,6 @@ pub use self::command::{ Reschedule, SnapshotBackfillInfo, }; pub use self::info::InflightSubscriptionInfo; -pub use self::rpc::StreamRpcManager; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -172,8 +171,6 @@ pub struct GlobalBarrierManagerContext { pub(super) metrics: Arc, - stream_rpc_manager: StreamRpcManager, - env: MetaSrvEnv, } @@ -596,7 +593,6 @@ impl GlobalBarrierManager { source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, metrics: Arc, - stream_rpc_manager: StreamRpcManager, scale_controller: ScaleControllerRef, ) -> Self { let enable_recovery = env.opts.enable_recovery; @@ -624,7 +620,6 @@ impl GlobalBarrierManager { scale_controller, sink_manager, metrics, - stream_rpc_manager, env: env.clone(), }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 25fe1fd2ceff..63cd4c16d9aa 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -1121,6 +1121,14 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } + { + for (node_id, actors) in &info.actor_map { + if !actors.is_empty() && !all_node_actors.contains_key(node_id) { + return Err(anyhow!("streaming job dropped during update").into()); + } + } + } + Ok(all_node_actors) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 14ee8b0c15f7..1e7d9b5dfa75 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -21,7 +21,7 @@ use anyhow::anyhow; use fail::fail_point; use futures::future::try_join_all; use futures::stream::{BoxStream, FuturesUnordered}; -use futures::{pin_mut, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorId; @@ -34,11 +34,9 @@ use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, - BuildActorInfo, DropActorsRequest, InjectBarrierRequest, StreamingControlStreamRequest, + BuildActorInfo, InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, }; -use risingwave_rpc_client::error::RpcError; -use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedSender; @@ -50,7 +48,7 @@ use uuid::Uuid; use super::command::CommandContext; use super::{BarrierKind, GlobalBarrierManagerContext, TracedEpoch}; use crate::barrier::info::InflightGraphInfo; -use crate::manager::{MetaSrvEnv, WorkerId}; +use crate::manager::WorkerId; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -393,7 +391,7 @@ impl ControlStreamManager { request: Some( streaming_control_stream_request::Request::InjectBarrier( InjectBarrierRequest { - request_id: StreamRpcManager::new_request_id(), + request_id: Uuid::new_v4().to_string(), barrier: Some(barrier), actor_ids_to_collect, table_ids_to_sync, @@ -512,95 +510,6 @@ impl GlobalBarrierManagerContext { } } -#[derive(Clone)] -pub struct StreamRpcManager { - env: MetaSrvEnv, -} - -impl StreamRpcManager { - pub fn new(env: MetaSrvEnv) -> Self { - Self { env } - } - - async fn make_request> + 'static>( - &self, - request: impl Iterator, - f: impl Fn(StreamClient, REQ) -> Fut, - ) -> MetaResult> { - let pool = self.env.stream_client_pool(); - let f = &f; - let iters = request.map(|(node, input)| async move { - let client = pool.get(node).await.map_err(|e| (node.id, e))?; - f(client, input).await.map_err(|e| (node.id, e)) - }); - let result = try_join_all_with_error_timeout(iters, COLLECT_ERROR_TIMEOUT).await; - result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err)) - } - - fn new_request_id() -> String { - Uuid::new_v4().to_string() - } - - pub async fn drop_actors( - &self, - node_map: &HashMap, - node_actors: impl Iterator)>, - ) -> MetaResult<()> { - self.make_request( - node_actors - .map(|(worker_id, actor_ids)| (node_map.get(&worker_id).unwrap(), actor_ids)), - |client, actor_ids| async move { - client - .drop_actors(DropActorsRequest { - request_id: Self::new_request_id(), - actor_ids, - }) - .await - }, - ) - .await?; - Ok(()) - } -} - -/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`. -async fn try_join_all_with_error_timeout( - iters: I, - error_timeout: Duration, -) -> Result, Vec> -where - I: IntoIterator, - F: Future>, -{ - let stream = FuturesUnordered::from_iter(iters); - pin_mut!(stream); - let mut results_ok = vec![]; - let mut results_err = vec![]; - while let Some(result) = stream.next().await { - match result { - Ok(rsp) => { - results_ok.push(rsp); - } - Err(err) => { - results_err.push(err); - break; - } - } - } - if results_err.is_empty() { - return Ok(results_ok); - } - let _ = timeout(error_timeout, async { - while let Some(result) = stream.next().await { - if let Err(err) = result { - results_err.push(err); - } - } - }) - .await; - Err(results_err) -} - pub(super) fn merge_node_rpc_errors( message: &str, errors: impl IntoIterator, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d10fa83710d8..2431e9f0f3a5 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -49,7 +49,7 @@ use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tokio::time::{Instant, MissedTickBehavior}; -use crate::barrier::{Command, Reschedule, StreamRpcManager}; +use crate::barrier::{Command, Reschedule}; use crate::controller::scale::RescheduleWorkingSet; use crate::manager::{ IdCategory, IdGenManagerImpl, LocalNotification, MetaSrvEnv, MetadataManager, @@ -437,8 +437,6 @@ pub struct ScaleController { pub source_manager: SourceManagerRef, - pub stream_rpc_manager: StreamRpcManager, - pub env: MetaSrvEnv, pub reschedule_lock: RwLock<()>, @@ -448,11 +446,9 @@ impl ScaleController { pub fn new( metadata_manager: &MetadataManager, source_manager: SourceManagerRef, - stream_rpc_manager: StreamRpcManager, env: MetaSrvEnv, ) -> Self { Self { - stream_rpc_manager, metadata_manager: metadata_manager.clone(), source_manager, env, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a8e8bc47752a..55ddbd04e721 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -31,7 +31,7 @@ use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{ BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, - ReplaceTablePlan, SnapshotBackfillInfo, StreamRpcManager, + ReplaceTablePlan, SnapshotBackfillInfo, }; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; @@ -203,8 +203,6 @@ pub struct GlobalStreamManager { creating_job_info: CreatingStreamingJobInfoRef, pub scale_controller: ScaleControllerRef, - - pub stream_rpc_manager: StreamRpcManager, } impl GlobalStreamManager { @@ -213,7 +211,6 @@ impl GlobalStreamManager { metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler, source_manager: SourceManagerRef, - stream_rpc_manager: StreamRpcManager, scale_controller: ScaleControllerRef, ) -> MetaResult { Ok(Self { @@ -223,7 +220,6 @@ impl GlobalStreamManager { source_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), scale_controller, - stream_rpc_manager, }) } @@ -816,13 +812,6 @@ mod tests { type StreamingControlStreamStream = impl Stream>; - async fn drop_actors( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(DropActorsResponse::default())) - } - async fn streaming_control_stream( &self, request: Request>, @@ -989,11 +978,9 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); - let stream_rpc_manager = StreamRpcManager::new(env.clone()); let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), - stream_rpc_manager.clone(), env.clone(), )); @@ -1005,7 +992,6 @@ mod tests { source_manager.clone(), sink_manager, meta_metrics.clone(), - stream_rpc_manager.clone(), scale_controller.clone(), ) .await; @@ -1015,7 +1001,6 @@ mod tests { metadata_manager, barrier_scheduler.clone(), source_manager.clone(), - stream_rpc_manager, scale_controller.clone(), )?; diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 920b6f0777f3..40a6d48dacb3 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -70,8 +70,7 @@ pub type StreamClientPoolRef = Arc; macro_rules! for_all_stream_rpc { ($macro:ident) => { $macro! { - { 0, drop_actors, DropActorsRequest, DropActorsResponse } - ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } + { 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 82d11db49513..4e8cbca23816 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1225,6 +1225,32 @@ mod tests { ) .unwrap(); + let dispatcher_updates = maplit::hashmap! { + actor_id => vec![PbDispatcherUpdate { + actor_id, + dispatcher_id: broadcast_dispatcher_id, + added_downstream_actor_id: vec![new], + removed_downstream_actor_id: vec![old], + hash_mapping: Default::default(), + }] + }; + let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( + UpdateMutation { + dispatchers: dispatcher_updates, + merges: Default::default(), + vnode_bitmaps: Default::default(), + dropped_actors: Default::default(), + actor_splits: Default::default(), + actor_new_dispatchers: Default::default(), + }, + )); + barrier_test_env.inject_barrier(&b1, [actor_id]); + barrier_test_env + .shared_context + .local_barrier_manager + .flush_all_events() + .await; + let executor = Box::new(DispatchExecutor::new( input, vec![broadcast_dispatcher, simple_dispatcher], @@ -1253,27 +1279,6 @@ mod tests { .await .unwrap(); - // 4. Send a configuration change barrier for broadcast dispatcher. - let dispatcher_updates = maplit::hashmap! { - actor_id => vec![PbDispatcherUpdate { - actor_id, - dispatcher_id: broadcast_dispatcher_id, - added_downstream_actor_id: vec![new], - removed_downstream_actor_id: vec![old], - hash_mapping: Default::default(), - }] - }; - let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( - UpdateMutation { - dispatchers: dispatcher_updates, - merges: Default::default(), - vnode_bitmaps: Default::default(), - dropped_actors: Default::default(), - actor_splits: Default::default(), - actor_new_dispatchers: Default::default(), - }, - )); - barrier_test_env.inject_barrier(&b1, [actor_id]); tx.send(Message::Barrier(b1.clone().into_dispatcher())) .await .unwrap(); diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index d65abc5a5ce5..13e9a67d1c52 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -14,6 +14,8 @@ use std::sync::Mutex; +use futures::future::BoxFuture; +use futures::FutureExt; use futures_async_stream::try_stream; use multimap::MultiMap; use risingwave_common::array::*; @@ -100,7 +102,7 @@ async fn test_merger_sum_aggr() { }; // join handles of all actors - let mut handles = vec![]; + let mut actor_futures: Vec> = vec![]; // input and output channels of the local aggregation actors let mut inputs = vec![]; @@ -113,7 +115,7 @@ async fn test_merger_sum_aggr() { let (tx, rx) = channel_for_test(); let (actor, channel) = make_actor(rx); outputs.push(channel); - handles.push(tokio::spawn(actor.run())); + actor_futures.push(actor.run().boxed()); inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput); } @@ -154,7 +156,7 @@ async fn test_merger_sum_aggr() { .local_barrier_manager .clone(), ); - handles.push(tokio::spawn(actor.run())); + actor_futures.push(actor.run().boxed()); let actor_ctx = ActorContext::for_test(gen_next_actor_id()); @@ -225,11 +227,21 @@ async fn test_merger_sum_aggr() { .local_barrier_manager .clone(), ); - handles.push(tokio::spawn(actor.run())); + actor_futures.push(actor.run().boxed()); let mut epoch = test_epoch(1); let b1 = Barrier::new_test_barrier(epoch); barrier_test_env.inject_barrier(&b1, actors.clone()); + barrier_test_env + .shared_context + .local_barrier_manager + .flush_all_events() + .await; + let handles = actor_futures + .into_iter() + .map(|actor_future| tokio::spawn(actor_future)) + .collect_vec(); + input .send(Message::Barrier(b1.into_dispatcher())) .await diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 393b80089515..d45d75604fa5 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -531,6 +531,11 @@ mod tests { let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch) .with_mutation(Mutation::Stop(HashSet::default())); barrier_test_env.inject_barrier(&b2, [actor_id]); + barrier_test_env + .shared_context + .local_barrier_manager + .flush_all_events() + .await; for (tx_id, tx) in txs.into_iter().enumerate() { let epochs = epochs.clone(); @@ -634,6 +639,33 @@ mod tests { .try_collect() .unwrap(); + let merge_updates = maplit::hashmap! { + (actor_id, upstream_fragment_id) => MergeUpdate { + actor_id, + upstream_fragment_id, + new_upstream_fragment_id: None, + added_upstream_actor_id: vec![new], + removed_upstream_actor_id: vec![old], + } + }; + + let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( + UpdateMutation { + dispatchers: Default::default(), + merges: merge_updates, + vnode_bitmaps: Default::default(), + dropped_actors: Default::default(), + actor_splits: Default::default(), + actor_new_dispatchers: Default::default(), + }, + )); + barrier_test_env.inject_barrier(&b1, [actor_id]); + barrier_test_env + .shared_context + .local_barrier_manager + .flush_all_events() + .await; + let mut merge = MergeExecutor::new( ActorContext::for_test(actor_id), fragment_id, @@ -682,28 +714,6 @@ mod tests { recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!(); - // 4. Send a configuration change barrier. - let merge_updates = maplit::hashmap! { - (actor_id, upstream_fragment_id) => MergeUpdate { - actor_id, - upstream_fragment_id, - new_upstream_fragment_id: None, - added_upstream_actor_id: vec![new], - removed_upstream_actor_id: vec![old], - } - }; - - let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( - UpdateMutation { - dispatchers: Default::default(), - merges: merge_updates, - vnode_bitmaps: Default::default(), - dropped_actors: Default::default(), - actor_splits: Default::default(), - actor_new_dispatchers: Default::default(), - }, - )); - barrier_test_env.inject_barrier(&b1, [actor_id]); send!( [untouched, old], Message::Barrier(b1.clone().into_dispatcher()) diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 6cabb7938833..9a99e59214bd 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -231,6 +231,35 @@ mod tests { let (upstream_fragment_id, fragment_id) = (10, 18); + // 4. Send a configuration change barrier. + let merge_updates = maplit::hashmap! { + (actor_id, upstream_fragment_id) => MergeUpdate { + actor_id, + upstream_fragment_id, + new_upstream_fragment_id: None, + added_upstream_actor_id: vec![new], + removed_upstream_actor_id: vec![old], + } + }; + + let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( + UpdateMutation { + dispatchers: Default::default(), + merges: merge_updates, + vnode_bitmaps: Default::default(), + dropped_actors: Default::default(), + actor_splits: Default::default(), + actor_new_dispatchers: Default::default(), + }, + )); + + barrier_test_env.inject_barrier(&b1, [actor_id]); + barrier_test_env + .shared_context + .local_barrier_manager + .flush_all_events() + .await; + let input = new_input( &ctx, metrics.clone(), @@ -297,30 +326,6 @@ mod tests { recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk. assert_recv_pending!(); - // 4. Send a configuration change barrier. - let merge_updates = maplit::hashmap! { - (actor_id, upstream_fragment_id) => MergeUpdate { - actor_id, - upstream_fragment_id, - new_upstream_fragment_id: None, - added_upstream_actor_id: vec![new], - removed_upstream_actor_id: vec![old], - } - }; - - let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update( - UpdateMutation { - dispatchers: Default::default(), - merges: merge_updates, - vnode_bitmaps: Default::default(), - dropped_actors: Default::default(), - actor_splits: Default::default(), - actor_new_dispatchers: Default::default(), - }, - )); - - barrier_test_env.inject_barrier(&b1, [actor_id]); - send!([new], Message::Barrier(b1.clone().into_dispatcher())); assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream. diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 88e86a599875..67fac31ed8f6 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -37,8 +37,7 @@ use tonic::{Code, Status}; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; use crate::task::{ - ActorHandle, ActorId, AtomicU64Ref, PartialGraphId, SharedContext, StreamEnvironment, - UpDownActorIds, + ActorId, AtomicU64Ref, PartialGraphId, SharedContext, StreamEnvironment, UpDownActorIds, }; mod managed_state; @@ -210,10 +209,6 @@ pub(super) enum LocalActorOperation { handle: ControlStreamHandle, init_request: InitRequest, }, - DropActors { - actors: Vec, - result_sender: oneshot::Sender<()>, - }, TakeReceiver { ids: UpDownActorIds, result_sender: oneshot::Sender>, @@ -228,29 +223,6 @@ pub(super) enum LocalActorOperation { }, } -pub(crate) struct StreamActorManagerState { - /// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit. - /// `handles` store join handles of these futures, and therefore we could wait their - /// termination. - pub(super) handles: HashMap, - - /// Stores all actor information, taken after actor built. - pub(super) actors: HashMap, - - /// Stores all actor tokio runtime monitoring tasks. - pub(super) actor_monitor_tasks: HashMap, -} - -impl StreamActorManagerState { - fn new() -> Self { - Self { - handles: HashMap::new(), - actors: HashMap::new(), - actor_monitor_tasks: HashMap::new(), - } - } -} - pub(crate) struct StreamActorManager { pub(super) env: StreamEnvironment, pub(super) streaming_metrics: Arc, @@ -294,7 +266,7 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { /// barriers to and collect them from all actors, and finally report the progress. pub(super) struct LocalBarrierWorker { /// Current barrier collection state. - state: ManagedBarrierState, + pub(super) state: ManagedBarrierState, /// Record all unexpected exited actors. failure_actors: HashMap, @@ -303,8 +275,6 @@ pub(super) struct LocalBarrierWorker { pub(super) actor_manager: Arc, - pub(super) actor_manager_state: StreamActorManagerState, - pub(super) current_shared_context: Arc, barrier_event_rx: UnboundedReceiver, @@ -328,14 +298,9 @@ impl LocalBarrierWorker { )); Self { failure_actors: HashMap::default(), - state: ManagedBarrierState::new( - actor_manager.env.state_store(), - actor_manager.streaming_metrics.clone(), - actor_manager.await_tree_reg.clone(), - ), + state: ManagedBarrierState::new(actor_manager.clone(), shared_context.clone()), control_stream_handle: ControlStreamHandle::empty(), actor_manager, - actor_manager_state: StreamActorManagerState::new(), current_shared_context: shared_context, barrier_event_rx: event_rx, actor_failure_rx: failure_rx, @@ -345,7 +310,7 @@ impl LocalBarrierWorker { fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> { LocalBarrierWorkerDebugInfo { - running_actors: self.actor_manager_state.handles.keys().cloned().collect(), + running_actors: self.state.actor_states.keys().cloned().collect(), managed_barrier_state: self.state.to_debug_info(), has_control_stream_connected: self.control_stream_handle.connected(), } @@ -384,7 +349,7 @@ impl LocalBarrierWorker { }); } LocalActorOperation::Shutdown { result_sender } => { - if !self.actor_manager_state.handles.is_empty() { + if !self.state.actor_states.is_empty() { tracing::warn!( "shutdown with running actors, scaling or migration will be triggered" ); @@ -419,15 +384,9 @@ impl LocalBarrierWorker { Request::InjectBarrier(req) => { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; self.update_actor_info(req.broadcast_info)?; - let actors = req - .actors_to_build - .iter() - .map(|actor| actor.actor.as_ref().unwrap().actor_id) - .collect_vec(); - self.update_actors(req.actors_to_build)?; - self.start_create_actors(&actors)?; self.send_barrier( &barrier, + req.actors_to_build, req.actor_ids_to_collect.into_iter().collect(), req.table_ids_to_sync .into_iter() @@ -484,7 +443,13 @@ impl LocalBarrierWorker { .map_err(|e| (actor_id, e))?; } #[cfg(test)] - LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), + LocalBarrierEvent::Flush(sender) => { + use futures::FutureExt; + while let Some(request) = self.control_stream_handle.next_request().now_or_never() { + self.handle_streaming_control_request(request).unwrap(); + } + sender.send(()).unwrap() + } } Ok(()) } @@ -494,13 +459,6 @@ impl LocalBarrierWorker { LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => { unreachable!("event {actor_op} should be handled separately in async context") } - LocalActorOperation::DropActors { - actors, - result_sender, - } => { - self.drop_actors(&actors); - let _ = result_sender.send(()); - } LocalActorOperation::TakeReceiver { ids, result_sender } => { let _ = result_sender.send(self.current_shared_context.take_receiver(ids)); } @@ -596,30 +554,12 @@ impl LocalBarrierWorker { fn send_barrier( &mut self, barrier: &Barrier, + to_build: Vec, to_collect: HashSet, table_ids: HashSet, partial_graph_id: PartialGraphId, actor_ids_to_pre_sync_barrier: HashSet, ) -> StreamResult<()> { - if !cfg!(test) { - // The barrier might be outdated and been injected after recovery in some certain extreme - // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during - // recovery. Check it here and return an error here if some actors are not found to - // avoid collection hang. We need some refine in meta side to remove this workaround since - // it will cause another round of unnecessary recovery. - let missing_actor_ids = to_collect - .iter() - .filter(|id| !self.actor_manager_state.handles.contains_key(id)) - .collect_vec(); - if !missing_actor_ids.is_empty() { - tracing::warn!( - "to collect actors not found, they should be cleaned when recovering: {:?}", - missing_actor_ids - ); - return Err(anyhow!("to collect actors not found: {:?}", to_collect).into()); - } - } - if barrier.kind == BarrierKind::Initial { self.actor_manager .watermark_epoch @@ -647,20 +587,12 @@ impl LocalBarrierWorker { self.state.transform_to_issued( barrier, + to_build, to_collect, table_ids, partial_graph_id, actor_ids_to_pre_sync_barrier, )?; - - // Actors to stop should still accept this barrier, but won't get sent to in next times. - if let Some(actors) = barrier.all_stop_actors() { - debug!( - target: "events::stream::barrier::manager", - "remove actors {:?} from senders", - actors - ); - } Ok(()) } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 5ccde5004801..766a6a594328 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -15,7 +15,7 @@ use std::assert_matches::assert_matches; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; -use std::future::{poll_fn, Future}; +use std::future::{pending, poll_fn, Future}; use std::mem::replace; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -32,16 +32,18 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::BuildActorInfo; use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; use thiserror_ext::AsReport; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use super::progress::BackfillState; use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::{await_tree_key, ActorId, PartialGraphId}; +use crate::task::{await_tree_key, ActorId, PartialGraphId, SharedContext, StreamActorManager}; struct IssuedState { pub mutation: Option>, @@ -83,12 +85,12 @@ enum ManagedBarrierStateInner { #[derive(Debug)] pub(super) struct BarrierState { - curr_epoch: u64, + barrier: Barrier, inner: ManagedBarrierStateInner, } type AwaitEpochCompletedFuture = - impl Future)> + 'static; + impl Future)> + 'static; fn sync_epoch( state_store: &S, @@ -192,8 +194,6 @@ impl Display for &'_ PartialGraphManagedBarrierState { } enum InflightActorStatus { - /// The actor is just spawned and not issued any barrier yet - NotStarted, /// The actor has been issued some barriers, but has not collected the first barrier IssuedFirst(Vec), /// The actor has been issued some barriers, and has collected the first barrier @@ -201,12 +201,11 @@ enum InflightActorStatus { } impl InflightActorStatus { - fn max_issued_epoch(&self) -> Option { + fn max_issued_epoch(&self) -> u64 { match self { - InflightActorStatus::NotStarted => None, - InflightActorStatus::Running(epoch) => Some(*epoch), + InflightActorStatus::Running(epoch) => *epoch, InflightActorStatus::IssuedFirst(issued_barriers) => { - Some(issued_barriers.last().expect("non-empty").epoch.prev) + issued_barriers.last().expect("non-empty").epoch.prev } } } @@ -223,18 +222,35 @@ pub(crate) struct InflightActorState { status: InflightActorStatus, /// Whether the actor has been issued a stop barrier is_stopping: bool, + + join_handle: JoinHandle<()>, + monitor_task_handle: Option>, } impl InflightActorState { - pub(super) fn not_started(actor_id: ActorId) -> Self { + pub(super) fn start( + actor_id: ActorId, + initial_partial_graph_id: PartialGraphId, + initial_barrier: &Barrier, + join_handle: JoinHandle<()>, + monitor_task_handle: Option>, + ) -> Self { Self { actor_id, pending_subscribers: Default::default(), barrier_senders: vec![], - inflight_barriers: BTreeMap::default(), - barrier_mutations: Default::default(), - status: InflightActorStatus::NotStarted, + inflight_barriers: BTreeMap::from_iter([( + initial_barrier.epoch.prev, + initial_partial_graph_id, + )]), + barrier_mutations: BTreeMap::from_iter([( + initial_barrier.epoch.prev, + (initial_barrier.mutation.clone(), initial_barrier.epoch.curr), + )]), + status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]), is_stopping: false, + join_handle, + monitor_task_handle, } } @@ -263,9 +279,7 @@ impl InflightActorState { barrier: &Barrier, is_stop: bool, ) -> StreamResult<()> { - if let Some(max_issued_epoch) = self.status.max_issued_epoch() { - assert!(barrier.epoch.prev > max_issued_epoch); - } + assert!(barrier.epoch.prev > self.status.max_issued_epoch()); if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { assert!( @@ -312,9 +326,6 @@ impl InflightActorState { } match &mut self.status { - InflightActorStatus::NotStarted => { - self.status = InflightActorStatus::IssuedFirst(vec![barrier.clone()]); - } InflightActorStatus::IssuedFirst(pending_barriers) => { pending_barriers.push(barrier.clone()); } @@ -338,9 +349,6 @@ impl InflightActorState { let (min_mutation_epoch, _) = self.barrier_mutations.pop_first().expect("should exist"); assert_eq!(min_mutation_epoch, epoch.prev); match &self.status { - InflightActorStatus::NotStarted => { - unreachable!("should have issued a barrier when collect") - } InflightActorStatus::IssuedFirst(pending_barriers) => { assert_eq!( prev_epoch, @@ -416,32 +424,27 @@ impl PartialGraphManagedBarrierState { } } -pub(super) struct ManagedBarrierState { +pub(crate) struct ManagedBarrierState { pub(super) actor_states: HashMap, pub(super) graph_states: HashMap, - pub(super) state_store: StateStoreImpl, - - pub(super) streaming_metrics: Arc, + actor_manager: Arc, - /// Manages the await-trees of all barriers. - barrier_await_tree_reg: Option, + current_shared_context: Arc, } impl ManagedBarrierState { /// Create a barrier manager state. This will be called only once. pub(super) fn new( - state_store: StateStoreImpl, - streaming_metrics: Arc, - barrier_await_tree_reg: Option, + actor_manager: Arc, + current_shared_context: Arc, ) -> Self { Self { actor_states: Default::default(), graph_states: Default::default(), - state_store, - streaming_metrics, - barrier_await_tree_reg, + actor_manager, + current_shared_context, } } @@ -450,6 +453,21 @@ impl ManagedBarrierState { graph_states: &self.graph_states, } } + + pub(crate) async fn abort_actors(&mut self) { + for (actor_id, state) in &self.actor_states { + tracing::debug!("force stopping actor {}", actor_id); + state.join_handle.abort(); + if let Some(monitor_task_handle) = &state.monitor_task_handle { + monitor_task_handle.abort(); + } + } + for (actor_id, state) in self.actor_states.drain() { + tracing::debug!("join actor {}", actor_id); + let result = state.join_handle.await; + assert!(result.is_ok() || result.unwrap_err().is_cancelled()); + } + } } impl InflightActorState { @@ -485,17 +503,13 @@ impl InflightActorState { .push(tx); } } else { - // Barrier has not issued yet. Store the pending tx - if let Some(max_issued_epoch) = self.status.max_issued_epoch() { - assert!( - max_issued_epoch < start_prev_epoch, - "later barrier {} has been issued, but skip the start epoch {:?}", - max_issued_epoch, - start_prev_epoch - ); - } else { - assert!(!self.is_stopping, "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); - } + let max_issued_epoch = self.status.max_issued_epoch(); + assert!( + max_issued_epoch < start_prev_epoch, + "later barrier {} has been issued, but skip the start epoch {:?}", + max_issued_epoch, + start_prev_epoch + ); self.pending_subscribers .entry(start_prev_epoch) .or_default() @@ -508,9 +522,6 @@ impl InflightActorState { tx: mpsc::UnboundedSender, ) -> StreamResult<()> { match &self.status { - InflightActorStatus::NotStarted => { - self.barrier_senders.push(tx); - } InflightActorStatus::IssuedFirst(pending_barriers) => { for barrier in pending_barriers { tx.send(barrier.clone()).map_err(|_| { @@ -539,8 +550,8 @@ impl ManagedBarrierState { tx: mpsc::UnboundedSender, ) { self.actor_states - .entry(actor_id) - .or_insert_with(|| InflightActorState::not_started(actor_id)) + .get_mut(&actor_id) + .expect("should exist") .subscribe_actor_mutation(start_prev_epoch, tx); } @@ -550,53 +561,105 @@ impl ManagedBarrierState { tx: mpsc::UnboundedSender, ) -> StreamResult<()> { self.actor_states - .entry(actor_id) - .or_insert_with(|| InflightActorState::not_started(actor_id)) + .get_mut(&actor_id) + .expect("should exist") .register_barrier_sender(tx) } pub(super) fn transform_to_issued( &mut self, barrier: &Barrier, + actors_to_build: Vec, actor_ids_to_collect: HashSet, table_ids: HashSet, partial_graph_id: PartialGraphId, actor_ids_to_pre_sync_barrier: HashSet, ) -> StreamResult<()> { let actor_to_stop = barrier.all_stop_actors(); + let is_stop_actor = |actor_id| { + actor_to_stop + .map(|actors| actors.contains(&actor_id)) + .unwrap_or(false) + }; let graph_state = self .graph_states .entry(partial_graph_id) .or_insert_with(|| { PartialGraphManagedBarrierState::new( - self.state_store.clone(), - self.streaming_metrics.clone(), - self.barrier_await_tree_reg.clone(), + self.actor_manager.env.state_store(), + self.actor_manager.streaming_metrics.clone(), + self.actor_manager.await_tree_reg.clone(), ) }); graph_state.transform_to_issued(barrier, actor_ids_to_collect.clone(), table_ids); + let mut new_actors = HashSet::new(); + for actor in actors_to_build { + let actor_id = actor.actor.as_ref().unwrap().actor_id; + assert!(!is_stop_actor(actor_id)); + assert!(new_actors.insert(actor_id)); + assert!(actor_ids_to_collect.contains(&actor_id)); + let (join_handle, monitor_join_handle) = self + .actor_manager + .spawn_actor(actor, self.current_shared_context.clone()); + assert!(self + .actor_states + .try_insert( + actor_id, + InflightActorState::start( + actor_id, + partial_graph_id, + barrier, + join_handle, + monitor_join_handle + ) + ) + .is_ok()); + } + + // Spawn a trivial join handle to be compatible with the unit test + if cfg!(test) { + for actor_id in &actor_ids_to_collect { + if !self.actor_states.contains_key(actor_id) { + let join_handle = self.actor_manager.runtime.spawn(async { pending().await }); + assert!(self + .actor_states + .try_insert( + *actor_id, + InflightActorState::start( + *actor_id, + partial_graph_id, + barrier, + join_handle, + None, + ) + ) + .is_ok()); + new_actors.insert(*actor_id); + } + } + } + // Note: it's important to issue barrier to actor after issuing to graph to ensure that // we call `start_epoch` on the graph before the actors receive the barrier - for actor_id in actor_ids_to_collect { + for actor_id in &actor_ids_to_collect { + if new_actors.contains(actor_id) { + continue; + } self.actor_states - .entry(actor_id) - .or_insert_with(|| InflightActorState::not_started(actor_id)) - .issue_barrier( - partial_graph_id, - barrier, - actor_to_stop - .map(|actors| actors.contains(&actor_id)) - .unwrap_or(false), - )?; + .get_mut(actor_id) + .unwrap_or_else(|| { + panic!("should exist: {} {:?}", actor_id, actor_ids_to_collect); + }) + .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?; } if partial_graph_id.is_global_graph() { for actor_id in actor_ids_to_pre_sync_barrier { self.actor_states - .entry(actor_id) - .or_insert_with(|| InflightActorState::not_started(actor_id)) + .get_mut(&actor_id) + .expect("should exist") .sync_barrier(barrier); } } else { @@ -610,9 +673,12 @@ impl ManagedBarrierState { ) -> impl Future + '_ { poll_fn(|cx| { for (partial_graph_id, graph_state) in &mut self.graph_states { - if let Poll::Ready(epoch) = graph_state.poll_next_completed_epoch(cx) { + if let Poll::Ready(barrier) = graph_state.poll_next_completed_barrier(cx) { + if let Some(actors_to_stop) = barrier.all_stop_actors() { + self.current_shared_context.drop_actors(actors_to_stop); + } let partial_graph_id = *partial_graph_id; - return Poll::Ready((partial_graph_id, epoch)); + return Poll::Ready((partial_graph_id, barrier.epoch.prev)); } } Poll::Pending @@ -626,7 +692,10 @@ impl ManagedBarrierState { .expect("should exist") .collect(epoch); if is_finished { - self.actor_states.remove(&actor_id); + let state = self.actor_states.remove(&actor_id).expect("should exist"); + if let Some(monitor_task_handle) = state.monitor_task_handle { + monitor_task_handle.abort(); + } } let prev_graph_state = self .graph_states @@ -677,7 +746,7 @@ impl PartialGraphManagedBarrierState { let create_mview_progress = self .create_mview_progress - .remove(&barrier_state.curr_epoch) + .remove(&barrier_state.barrier.epoch.curr) .unwrap_or_default() .into_iter() .map(|(actor, state)| CreateMviewProgress { @@ -685,7 +754,7 @@ impl PartialGraphManagedBarrierState { done: matches!(state, BackfillState::Done(_)), consumed_epoch: match state { BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, - BackfillState::Done(_) => barrier_state.curr_epoch, + BackfillState::Done(_) => barrier_state.barrier.epoch.curr, }, consumed_rows: match state { BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows, @@ -724,6 +793,8 @@ impl PartialGraphManagedBarrierState { } }; + let barrier = barrier_state.barrier.clone(); + self.await_epoch_completed_futures.push_back({ let future = async move { if let Some(future) = complete_barrier_future { @@ -735,7 +806,7 @@ impl PartialGraphManagedBarrierState { } .map(move |result| { ( - prev_epoch, + barrier, result.map(|sync_result| BarrierCompleteResult { sync_result, create_mview_progress, @@ -775,7 +846,7 @@ impl PartialGraphManagedBarrierState { ) } Some(&mut BarrierState { - curr_epoch, + ref barrier, inner: ManagedBarrierStateInner::Issued(IssuedState { ref mut remaining_actors, @@ -789,7 +860,7 @@ impl PartialGraphManagedBarrierState { "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}", actor_id, epoch.curr ); - assert_eq!(curr_epoch, epoch.curr); + assert_eq!(barrier.epoch.curr, epoch.curr); self.may_have_collected_all(epoch.prev); } Some(BarrierState { inner, .. }) => { @@ -871,7 +942,7 @@ impl PartialGraphManagedBarrierState { self.epoch_barrier_state_map.insert( barrier.epoch.prev, BarrierState { - curr_epoch: barrier.epoch.curr, + barrier: barrier.clone(), inner: ManagedBarrierStateInner::Issued(IssuedState { remaining_actors: BTreeSet::from_iter(actor_ids_to_collect), mutation: barrier.mutation.clone(), @@ -885,17 +956,17 @@ impl PartialGraphManagedBarrierState { } /// Return a future that yields the next completed epoch. The future is cancellation safe. - pub(crate) fn poll_next_completed_epoch(&mut self, cx: &mut Context<'_>) -> Poll { + pub(crate) fn poll_next_completed_barrier(&mut self, cx: &mut Context<'_>) -> Poll { ready!(self.await_epoch_completed_futures.next().poll_unpin(cx)) - .map(|(prev_epoch, result)| { + .map(|(barrier, result)| { let state = self .epoch_barrier_state_map - .get_mut(&prev_epoch) + .get_mut(&barrier.epoch.prev) .expect("should exist"); // sanity check on barrier state assert_matches!(&state.inner, ManagedBarrierStateInner::AllCollected); state.inner = ManagedBarrierStateInner::Completed(result); - prev_epoch + barrier }) .map(Poll::Ready) .unwrap_or(Poll::Pending) @@ -941,9 +1012,12 @@ impl PartialGraphManagedBarrierState { #[cfg(test)] async fn pop_next_completed_epoch(&mut self) -> u64 { - let epoch = poll_fn(|cx| self.poll_next_completed_epoch(cx)).await; - let _ = self.pop_completed_epoch(epoch).unwrap().unwrap(); - epoch + let barrier = poll_fn(|cx| self.poll_next_completed_barrier(cx)).await; + let _ = self + .pop_completed_epoch(barrier.epoch.prev) + .unwrap() + .unwrap(); + barrier.epoch.prev } } diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index d6a8256aebb6..112ee533d8e6 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -40,19 +40,22 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { // Register actors let actor_ids = vec![233, 234, 235]; - let count = actor_ids.len(); - let mut rxs = actor_ids - .clone() - .into_iter() - .map(register_sender) - .collect_vec(); // Send a barrier to all actors let curr_epoch = test_epoch(2); let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - test_env.inject_barrier(&barrier, actor_ids); + test_env.inject_barrier(&barrier, actor_ids.clone()); + + manager.flush_all_events().await; + + let count = actor_ids.len(); + let mut rxs = actor_ids + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); // Collect barriers from actors let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { @@ -105,6 +108,14 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { .chain(once(extra_actor_id)) .collect_vec(); + // Prepare the barrier + let curr_epoch = test_epoch(2); + let barrier = Barrier::new_test_barrier(curr_epoch).with_stop(); + + test_env.inject_barrier(&barrier, actor_ids_to_collect.clone()); + + manager.flush_all_events().await; + // Register actors let count = actor_ids_to_send.len(); let mut rxs = actor_ids_to_send @@ -113,10 +124,6 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { .map(register_sender) .collect_vec(); - // Prepare the barrier - let curr_epoch = test_epoch(2); - let barrier = Barrier::new_test_barrier(curr_epoch).with_stop(); - let mut mutation_subscriber = manager.subscribe_barrier_mutation(extra_actor_id, &barrier.clone().into_dispatcher()); @@ -124,8 +131,6 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let mut mutation_reader = pin!(mutation_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); - test_env.inject_barrier(&barrier, actor_ids_to_collect); - let (epoch, mutation) = mutation_reader.await.unwrap(); assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); @@ -196,6 +201,8 @@ async fn test_late_register_barrier_sender() -> StreamResult<()> { test_env.inject_barrier(&barrier1, actor_ids_to_collect.clone()); test_env.inject_barrier(&barrier2, actor_ids_to_collect.clone()); + manager.flush_all_events().await; + // register sender after inject barrier let mut rxs = actor_ids_to_send .clone() diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index b5382b341805..59851fdf09ad 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock}; @@ -194,7 +194,7 @@ impl SharedContext { &self.config } - pub fn drop_actors(&self, actors: &[ActorId]) { + pub(super) fn drop_actors(&self, actors: &HashSet) { self.channel_map .lock() .retain(|(up_id, _), _| !actors.contains(up_id)); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 60b734137149..ba76e6fab791 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -19,7 +19,6 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; -use anyhow::anyhow; use async_recursion::async_recursion; use await_tree::InstrumentAwait; use futures::stream::BoxStream; @@ -59,8 +58,8 @@ use crate::task::barrier_manager::{ ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, }; use crate::task::{ - ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, - StreamActorManagerState, StreamEnvironment, UpDownActorIds, + ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, StreamEnvironment, + UpDownActorIds, }; #[cfg(test)] @@ -214,16 +213,6 @@ impl LocalStreamManager { }) } - /// Drop the resources of the given actors. - pub async fn drop_actors(&self, actors: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::DropActors { - actors, - result_sender, - }) - .await - } - pub async fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult { self.actor_op_tx .send_and_await(|result_sender| LocalActorOperation::TakeReceiver { @@ -256,28 +245,9 @@ impl LocalStreamManager { } impl LocalBarrierWorker { - /// Drop the resources of the given actors. - pub(super) fn drop_actors(&mut self, actors: &[ActorId]) { - self.current_shared_context.drop_actors(actors); - for &id in actors { - self.actor_manager_state.drop_actor(id); - } - tracing::debug!(actors = ?actors, "drop actors"); - } - /// Force stop all actors on this worker, and then drop their resources. pub(super) async fn reset(&mut self, version_id: HummockVersionId) { - let actor_handles = self.actor_manager_state.drain_actor_handles(); - for (actor_id, handle) in &actor_handles { - tracing::debug!("force stopping actor {}", actor_id); - handle.abort(); - } - for (actor_id, handle) in actor_handles { - tracing::debug!("join actor {}", actor_id); - let result = handle.await; - assert!(result.is_ok() || result.unwrap_err().is_cancelled()); - } - self.actor_manager_state.clear_state(); + self.state.abort_actors().await; if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.clear(); } @@ -291,26 +261,6 @@ impl LocalBarrierWorker { self.reset_state(); self.actor_manager.env.dml_manager_ref().clear(); } - - pub(super) fn update_actors(&mut self, actors: Vec) -> StreamResult<()> { - self.actor_manager_state.update_actors(actors) - } - - /// This function could only be called once during the lifecycle of `LocalStreamManager` for - /// now. - pub(super) fn start_create_actors(&mut self, actors: &[ActorId]) -> StreamResult<()> { - let actors: Vec<_> = actors - .iter() - .map(|actor_id| { - self.actor_manager_state - .actors - .remove(actor_id) - .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) - }) - .try_collect()?; - self.spawn_actors(actors); - Ok(()) - } } impl StreamActorManager { @@ -559,18 +509,22 @@ impl StreamActorManager { } } -impl LocalBarrierWorker { - pub(super) fn spawn_actors(&mut self, actors: Vec) { - for actor in actors { +impl StreamActorManager { + pub(super) fn spawn_actor( + self: &Arc, + actor: BuildActorInfo, + current_shared_context: Arc, + ) -> (JoinHandle<()>, Option>) { + { let monitor = tokio_metrics::TaskMonitor::new(); let stream_actor_ref = actor.actor.as_ref().unwrap(); let actor_id = stream_actor_ref.actor_id; let handle = { let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition); - let barrier_manager = self.current_shared_context.local_barrier_manager.clone(); + let barrier_manager = current_shared_context.local_barrier_manager.clone(); // wrap the future of `create_actor` with `boxed` to avoid stack overflow - let actor = self.actor_manager.clone().create_actor(actor, self.current_shared_context.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { + let actor = self.clone().create_actor(actor, current_shared_context).boxed().and_then(|actor| actor.run()).map(move |result| { if let Err(err) = result { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace. @@ -578,7 +532,7 @@ impl LocalBarrierWorker { barrier_manager.notify_failure(actor_id, err); } }); - let traced = match &self.actor_manager.await_tree_reg { + let traced = match &self.await_tree_reg { Some(m) => m .register(await_tree_key::Actor(actor_id), trace_span) .instrument(actor) @@ -586,24 +540,17 @@ impl LocalBarrierWorker { None => actor.right_future(), }; let instrumented = monitor.instrument(traced); - let with_config = - crate::CONFIG.scope(self.actor_manager.env.config().clone(), instrumented); + let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented); - self.actor_manager.runtime.spawn(with_config) + self.runtime.spawn(with_config) }; - self.actor_manager_state.handles.insert(actor_id, handle); - - if self.actor_manager.streaming_metrics.level >= MetricLevel::Debug - || self - .actor_manager - .env - .config() - .developer - .enable_actor_tokio_metrics + + let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug + || self.env.config().developer.enable_actor_tokio_metrics { tracing::info!("Tokio metrics are enabled."); - let streaming_metrics = self.actor_manager.streaming_metrics.clone(); - let actor_monitor_task = self.actor_manager.runtime.spawn(async move { + let streaming_metrics = self.streaming_metrics.clone(); + let actor_monitor_task = self.runtime.spawn(async move { let metrics = streaming_metrics.new_actor_metrics(actor_id); loop { let task_metrics = monitor.cumulative(); @@ -643,10 +590,11 @@ impl LocalBarrierWorker { tokio::time::sleep(Duration::from_secs(1)).await; } }); - self.actor_manager_state - .actor_monitor_tasks - .insert(actor_id, actor_monitor_task); - } + Some(actor_monitor_task) + } else { + None + }; + (handle, monitor_handle) } } } @@ -671,44 +619,6 @@ impl LocalBarrierWorker { } } -impl StreamActorManagerState { - /// `drop_actor` is invoked by meta node via RPC once the stop barrier arrives at the - /// sink. All the actors in the actors should stop themselves before this method is invoked. - fn drop_actor(&mut self, actor_id: ActorId) { - self.actor_monitor_tasks - .remove(&actor_id) - .inspect(|handle| handle.abort()); - self.actors.remove(&actor_id); - - // Task should have already stopped when this method is invoked. There might be some - // clean-up work left (like dropping in-memory data structures), but we don't have to wait - // for them to finish, in order to make this request non-blocking. - self.handles.remove(&actor_id); - } - - fn drain_actor_handles(&mut self) -> Vec<(ActorId, ActorHandle)> { - self.handles.drain().collect() - } - - /// `stop_all_actors` is invoked by meta node via RPC for recovery purpose. Different from the - /// `drop_actor`, the execution of the actors will be aborted. - fn clear_state(&mut self) { - self.actors.clear(); - self.actor_monitor_tasks.clear(); - } - - fn update_actors(&mut self, actors: Vec) -> StreamResult<()> { - for actor in actors { - let actor_id = actor.actor.as_ref().unwrap().get_actor_id(); - self.actors - .try_insert(actor_id, actor) - .map_err(|_| anyhow!("duplicated actor {}", actor_id))?; - } - - Ok(()) - } -} - #[cfg(test)] pub mod test_utils { use risingwave_pb::common::HostAddress;