From da46c4dd2be4c69d77f8347fbe18b72d6653cfaf Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 21 Aug 2024 19:52:12 +0800 Subject: [PATCH] refactor: register actor barrier sender asynchronously (#18104) --- proto/stream_service.proto | 1 - src/meta/src/barrier/info.rs | 18 -- src/meta/src/barrier/rpc.rs | 7 - src/stream/src/executor/dispatch.rs | 8 +- src/stream/src/executor/integration_tests.rs | 6 +- src/stream/src/executor/merge.rs | 12 +- src/stream/src/executor/receiver.rs | 2 +- src/stream/src/from_proto/barrier_recv.rs | 9 +- src/stream/src/from_proto/now.rs | 9 +- .../src/from_proto/source/trad_source.rs | 9 +- src/stream/src/from_proto/stream_scan.rs | 9 +- src/stream/src/from_proto/values.rs | 9 +- src/stream/src/task/barrier_manager.rs | 223 ++++-------------- .../src/task/barrier_manager/managed_state.rs | 130 +++++++--- src/stream/src/task/barrier_manager/tests.rs | 143 ++++++++--- src/stream/src/task/stream_manager.rs | 73 ++---- 16 files changed, 310 insertions(+), 358 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index ab9a70a55c1a..419a61f113ed 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -55,7 +55,6 @@ message DropActorsResponse { message InjectBarrierRequest { string request_id = 1; stream_plan.Barrier barrier = 2; - repeated uint32 actor_ids_to_send = 3; repeated uint32 actor_ids_to_collect = 4; repeated uint32 table_ids_to_sync = 5; uint32 partial_graph_id = 6; diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 914b9d207bfd..06a390380abf 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -259,24 +259,6 @@ impl InflightGraphInfo { }) } - /// Returns actor list to send in the target worker node. - pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator + '_ { - self.fragment_infos - .values() - .filter(|info| info.is_injectable) - .flat_map(move |info| { - info.actors - .iter() - .filter_map(move |(actor_id, actor_node_id)| { - if *actor_node_id == node_id { - Some(*actor_id) - } else { - None - } - }) - }) - } - pub fn existing_table_ids(&self) -> impl Iterator + '_ { self.fragment_infos .values() diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index f091a09bd522..1b4ab6207db9 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -302,15 +302,9 @@ impl ControlStreamManager { self.nodes .iter_mut() .map(|(node_id, node)| { - let actor_ids_to_send: Vec<_> = - pre_applied_graph_info.actor_ids_to_send(*node_id).collect(); let actor_ids_to_collect: Vec<_> = pre_applied_graph_info .actor_ids_to_collect(*node_id) .collect(); - if actor_ids_to_collect.is_empty() { - // No need to send or collect barrier for this node. - assert!(actor_ids_to_send.is_empty()); - } let table_ids_to_sync = if let Some(graph_info) = applied_graph_info { graph_info .existing_table_ids() @@ -340,7 +334,6 @@ impl ControlStreamManager { InjectBarrierRequest { request_id: StreamRpcManager::new_request_id(), barrier: Some(barrier), - actor_ids_to_send, actor_ids_to_collect, table_ids_to_sync, partial_graph_id, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 9f452dc1863b..39a189616531 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1271,7 +1271,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); tx.send(Message::Barrier(b1.clone().into_dispatcher())) .await .unwrap(); @@ -1291,7 +1291,7 @@ mod tests { // 6. Send another barrier. let b2 = Barrier::new_test_barrier(test_epoch(2)); - barrier_test_env.inject_barrier(&b2, [], [actor_id]); + barrier_test_env.inject_barrier(&b2, [actor_id]); tx.send(Message::Barrier(b2.into_dispatcher())) .await .unwrap(); @@ -1330,7 +1330,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b3, [], [actor_id]); + barrier_test_env.inject_barrier(&b3, [actor_id]); tx.send(Message::Barrier(b3.into_dispatcher())) .await .unwrap(); @@ -1344,7 +1344,7 @@ mod tests { // 11. Send another barrier. let b4 = Barrier::new_test_barrier(test_epoch(4)); - barrier_test_env.inject_barrier(&b4, [], [actor_id]); + barrier_test_env.inject_barrier(&b4, [actor_id]); tx.send(Message::Barrier(b4.into_dispatcher())) .await .unwrap(); diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 0b7415adac38..b03189e932a8 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -228,7 +228,7 @@ async fn test_merger_sum_aggr() { let mut epoch = test_epoch(1); let b1 = Barrier::new_test_barrier(epoch); - barrier_test_env.inject_barrier(&b1, [], actors.clone()); + barrier_test_env.inject_barrier(&b1, actors.clone()); input .send(Message::Barrier(b1.into_dispatcher())) .await @@ -244,7 +244,7 @@ async fn test_merger_sum_aggr() { input.send(Message::Chunk(chunk)).await.unwrap(); } let b = Barrier::new_test_barrier(epoch); - barrier_test_env.inject_barrier(&b, [], actors.clone()); + barrier_test_env.inject_barrier(&b, actors.clone()); input .send(Message::Barrier(b.into_dispatcher())) .await @@ -253,7 +253,7 @@ async fn test_merger_sum_aggr() { } let b = Barrier::new_test_barrier(epoch) .with_mutation(Mutation::Stop(actors.clone().into_iter().collect())); - barrier_test_env.inject_barrier(&b, [], actors); + barrier_test_env.inject_barrier(&b, actors); input .send(Message::Barrier(b.into_dispatcher())) .await diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 8254dfeadc68..b58ea1f44c51 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -524,13 +524,13 @@ mod tests { .map(|(_, epoch)| { let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch); *prev_epoch = *epoch; - barrier_test_env.inject_barrier(&barrier, [], [actor_id]); + barrier_test_env.inject_barrier(&barrier, [actor_id]); (*epoch, barrier) }) .collect(); let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch) .with_mutation(Mutation::Stop(HashSet::default())); - barrier_test_env.inject_barrier(&b2, [], [actor_id]); + barrier_test_env.inject_barrier(&b2, [actor_id]); for (tx_id, tx) in txs.into_iter().enumerate() { let epochs = epochs.clone(); @@ -703,7 +703,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); send!( [untouched, old], Message::Barrier(b1.clone().into_dispatcher()) @@ -820,11 +820,7 @@ mod tests { ) }; - test_env.inject_barrier( - &exchange_client_test_barrier(), - [], - [remote_input.actor_id()], - ); + test_env.inject_barrier(&exchange_client_test_barrier(), [remote_input.actor_id()]); pin_mut!(remote_input); diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 58700d2a1350..6cabb7938833 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -319,7 +319,7 @@ mod tests { }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); send!([new], Message::Barrier(b1.clone().into_dispatcher())); assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream. diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index fddc9c4fd253..21bbdece8008 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_pb::stream_plan::BarrierRecvNode; -use tokio::sync::mpsc::unbounded_channel; use super::*; use crate::executor::BarrierRecvExecutor; @@ -33,10 +32,10 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder { "barrier receiver should not have input" ); - let (sender, barrier_receiver) = unbounded_channel(); - params - .create_actor_context - .register_sender(params.actor_context.id, sender); + let barrier_receiver = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver); Ok((params.info, exec).into()) diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index 9eac7caa1355..d3cc35215029 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -18,7 +18,6 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; -use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; @@ -36,10 +35,10 @@ impl ExecutorBuilder for NowExecutorBuilder { node: &NowNode, store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params - .create_actor_context - .register_sender(params.actor_context.id, sender); + let barrier_receiver = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); let mode = if let Ok(pb_mode) = node.get_mode() { match pb_mode { diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 39e3293933e4..98746a672e43 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -27,7 +27,6 @@ use risingwave_pb::plan_common::{ }; use risingwave_pb::stream_plan::SourceNode; use risingwave_storage::panic_store::PanicStateStore; -use tokio::sync::mpsc::unbounded_channel; use super::*; use crate::executor::source::{ @@ -141,10 +140,10 @@ impl ExecutorBuilder for SourceExecutorBuilder { node: &Self::Node, store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params - .create_actor_context - .register_sender(params.actor_context.id, sender); + let barrier_receiver = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); let system_params = params.env.system_params_manager_ref().get_params(); if let Some(source) = &node.source_inner { diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 33badbb01288..eaa436752455 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -20,7 +20,6 @@ use risingwave_common::util::value_encoding::BasicSerde; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType}; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use tokio::sync::mpsc; use super::*; use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; @@ -154,10 +153,10 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { .collect_vec(); let vnodes = params.vnode_bitmap.map(Arc::new); - let (barrier_tx, barrier_rx) = mpsc::unbounded_channel(); - params - .create_actor_context - .register_sender(params.actor_context.id, barrier_tx); + let barrier_rx = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); let upstream_table = StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc); diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 96cf002488e8..10654c5f75b6 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -16,7 +16,6 @@ use itertools::Itertools; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::ValuesNode; use risingwave_storage::StateStore; -use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::error::StreamResult; @@ -35,10 +34,10 @@ impl ExecutorBuilder for ValuesExecutorBuilder { node: &ValuesNode, _store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params - .create_actor_context - .register_sender(params.actor_context.id, sender); + let barrier_receiver = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); let progress = params .local_barrier_manager .register_create_mview_progress(params.actor_context.id); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 950974b8acb5..b6cf8c525a5e 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -18,17 +18,15 @@ use std::future::pending; use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, Context}; -use futures::stream::{BoxStream, FuturesUnordered}; +use anyhow::anyhow; +use futures::stream::BoxStream; use futures::StreamExt; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::error::tonic::extra::Score; use risingwave_pb::stream_service::barrier_complete_response::{ GroupedSstableInfo, PbCreateMviewProgress, }; use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper}; -use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -67,10 +65,7 @@ use risingwave_pb::stream_service::{ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - Actor, Barrier, BarrierInner, DispatchExecutor, DispatcherBarrier, Mutation, - StreamExecutorError, -}; +use crate::executor::{Barrier, BarrierInner, DispatcherBarrier, Mutation, StreamExecutorError}; use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo; use crate::task::barrier_manager::progress::BackfillState; @@ -185,39 +180,6 @@ impl ControlStreamHandle { } } -#[derive(Clone)] -pub struct CreateActorContext { - #[expect(clippy::type_complexity)] - barrier_sender: Arc>>>>>, -} - -impl Default for CreateActorContext { - fn default() -> Self { - Self { - barrier_sender: Arc::new(Mutex::new(Some(HashMap::new()))), - } - } -} - -impl CreateActorContext { - pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { - self.barrier_sender - .lock() - .as_mut() - .expect("should not register after collecting sender") - .entry(actor_id) - .or_default() - .push(sender) - } - - pub(super) fn collect_senders(&self) -> HashMap>> { - self.barrier_sender - .lock() - .take() - .expect("should not collect senders for twice") - } -} - pub(crate) type SubscribeMutationItem = (u64, Option>); pub(super) enum LocalBarrierEvent { @@ -235,6 +197,10 @@ pub(super) enum LocalBarrierEvent { epoch: EpochPair, mutation_sender: mpsc::UnboundedSender, }, + RegisterBarrierSender { + actor_id: ActorId, + barrier_sender: mpsc::UnboundedSender, + }, #[cfg(test)] Flush(oneshot::Sender<()>), } @@ -267,12 +233,6 @@ pub(super) enum LocalActorOperation { }, #[cfg(test)] GetCurrentSharedContext(oneshot::Sender>), - #[cfg(test)] - RegisterSenders { - actor_id: ActorId, - senders: Vec>, - result_sender: oneshot::Sender<()>, - }, InspectState { result_sender: oneshot::Sender, }, @@ -281,11 +241,6 @@ pub(super) enum LocalActorOperation { }, } -pub(super) struct CreateActorOutput { - pub(super) actors: Vec>, - pub(super) senders: HashMap>>, -} - pub(crate) struct StreamActorManagerState { /// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit. /// `handles` store join handles of these futures, and therefore we could wait their @@ -297,14 +252,6 @@ pub(crate) struct StreamActorManagerState { /// Stores all actor tokio runtime monitoring tasks. pub(super) actor_monitor_tasks: HashMap, - - #[expect(clippy::type_complexity)] - pub(super) creating_actors: FuturesUnordered< - AttachedFuture< - JoinHandle>, - (BTreeSet, oneshot::Sender>), - >, - >, } impl StreamActorManagerState { @@ -313,22 +260,8 @@ impl StreamActorManagerState { handles: HashMap::new(), actors: HashMap::new(), actor_monitor_tasks: HashMap::new(), - creating_actors: FuturesUnordered::new(), } } - - async fn next_created_actors( - &mut self, - ) -> ( - oneshot::Sender>, - StreamResult, - ) { - let (join_result, (_, sender)) = pending_on_none(self.creating_actors.next()).await; - ( - sender, - try { join_result.context("failed to join creating actors futures")?? }, - ) - } } pub(crate) struct StreamActorManager { @@ -346,9 +279,7 @@ pub(crate) struct StreamActorManager { } pub(super) struct LocalBarrierWorkerDebugInfo<'a> { - actor_to_send: BTreeSet, running_actors: BTreeSet, - creating_actors: Vec>, managed_barrier_state: ManagedBarrierStateDebugInfo<'a>, has_control_stream_connected: bool, } @@ -360,18 +291,6 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { write!(f, "{}, ", actor_id)?; } - write!(f, "\nactor_to_send: ")?; - for actor_id in &self.actor_to_send { - write!(f, "{}, ", actor_id)?; - } - - write!(f, "\ncreating_actors: ")?; - for actors in &self.creating_actors { - for actor_id in actors { - write!(f, "{}, ", actor_id)?; - } - } - writeln!( f, "\nhas_control_stream_connected: {}", @@ -387,9 +306,6 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. pub(super) struct LocalBarrierWorker { - /// Stores all streaming job source sender. - barrier_senders: HashMap>>, - /// Current barrier collection state. state: ManagedBarrierState, @@ -424,7 +340,6 @@ impl LocalBarrierWorker { }, )); Self { - barrier_senders: HashMap::new(), failure_actors: HashMap::default(), state: ManagedBarrierState::new( actor_manager.env.state_store(), @@ -443,14 +358,7 @@ impl LocalBarrierWorker { fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> { LocalBarrierWorkerDebugInfo { - actor_to_send: self.barrier_senders.keys().cloned().collect(), running_actors: self.actor_manager_state.handles.keys().cloned().collect(), - creating_actors: self - .actor_manager_state - .creating_actors - .iter() - .map(|fut| fut.item().0.clone()) - .collect(), managed_barrier_state: self.state.to_debug_info(), has_control_stream_connected: self.control_stream_handle.connected(), } @@ -460,9 +368,6 @@ impl LocalBarrierWorker { loop { select! { biased; - (sender, create_actors_result) = self.actor_manager_state.next_created_actors() => { - self.handle_actor_created(sender, create_actors_result); - } (partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => { let result = self.on_epoch_completed(partial_graph_id, completed_epoch); if let Err(err) = result { @@ -470,11 +375,15 @@ impl LocalBarrierWorker { } }, event = self.barrier_event_rx.recv() => { - self.handle_barrier_event(event.expect("should not be none")); + // event should not be None because the LocalBarrierManager holds a copy of tx + let result = self.handle_barrier_event(event.expect("should not be none")); + if let Err((actor_id, err)) = result { + self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await; + } }, failure = self.actor_failure_rx.recv() => { let (actor_id, err) = failure.unwrap(); - self.notify_actor_failure(actor_id, err).await; + self.notify_actor_failure(actor_id, err, "recv actor failure").await; }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { @@ -515,21 +424,6 @@ impl LocalBarrierWorker { } } - fn handle_actor_created( - &mut self, - sender: oneshot::Sender>, - create_actor_result: StreamResult, - ) { - let result = create_actor_result.map(|output| { - for (actor_id, senders) in output.senders { - self.register_sender(actor_id, senders); - } - self.spawn_actors(output.actors); - }); - - let _ = sender.send(result); - } - fn handle_streaming_control_request( &mut self, request: StreamingControlStreamRequest, @@ -539,7 +433,6 @@ impl LocalBarrierWorker { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; self.send_barrier( &barrier, - req.actor_ids_to_send.into_iter().collect(), req.actor_ids_to_collect.into_iter().collect(), req.table_ids_to_sync .into_iter() @@ -564,7 +457,10 @@ impl LocalBarrierWorker { } } - fn handle_barrier_event(&mut self, event: LocalBarrierEvent) { + fn handle_barrier_event( + &mut self, + event: LocalBarrierEvent, + ) -> Result<(), (ActorId, StreamError)> { match event { LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => { self.collect(actor_id, epoch) @@ -584,9 +480,18 @@ impl LocalBarrierWorker { self.state .subscribe_actor_mutation(actor_id, epoch.prev, mutation_sender); } + LocalBarrierEvent::RegisterBarrierSender { + actor_id, + barrier_sender, + } => { + self.state + .register_barrier_sender(actor_id, barrier_sender) + .map_err(|e| (actor_id, e))?; + } #[cfg(test)] LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), } + Ok(()) } fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { @@ -625,15 +530,6 @@ impl LocalBarrierWorker { LocalActorOperation::GetCurrentSharedContext(sender) => { let _ = sender.send(self.current_shared_context.clone()); } - #[cfg(test)] - LocalActorOperation::RegisterSenders { - actor_id, - senders, - result_sender, - } => { - self.register_sender(actor_id, senders); - let _ = result_sender.send(()); - } LocalActorOperation::InspectState { result_sender } => { let debug_info = self.to_debug_info(); let _ = result_sender.send(debug_info.to_string()); @@ -713,19 +609,6 @@ impl LocalBarrierWorker { Ok(()) } - /// Register sender for source actors, used to send barriers. - fn register_sender(&mut self, actor_id: ActorId, senders: Vec>) { - tracing::debug!( - target: "events::stream::barrier::manager", - actor_id = actor_id, - "register sender" - ); - self.barrier_senders - .entry(actor_id) - .or_default() - .extend(senders); - } - /// Broadcast a barrier to all senders. Save a receiver which will get notified when this /// barrier is finished, in managed mode. /// @@ -735,7 +618,6 @@ impl LocalBarrierWorker { fn send_barrier( &mut self, barrier: &Barrier, - to_send: HashSet, to_collect: HashSet, table_ids: HashSet, partial_graph_id: PartialGraphId, @@ -767,9 +649,8 @@ impl LocalBarrierWorker { } debug!( target: "events::stream::barrier::manager::send", - "send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}", + "send barrier {:?}, actor_ids_to_collect = {:?}", barrier, - to_send, to_collect ); @@ -792,31 +673,7 @@ impl LocalBarrierWorker { table_ids, partial_graph_id, actor_ids_to_pre_sync_barrier, - ); - - for actor_id in to_send { - match self.barrier_senders.get(&actor_id) { - Some(senders) => { - for sender in senders { - if let Err(_err) = sender.send(barrier.clone()) { - // return err to trigger recovery. - return Err(StreamError::barrier_send( - barrier.clone(), - actor_id, - "channel closed", - )); - } - } - } - None => { - return Err(StreamError::barrier_send( - barrier.clone(), - actor_id, - "sender not found", - )); - } - } - } + )?; // Actors to stop should still accept this barrier, but won't get sent to in next times. if let Some(actors) = barrier.all_stop_actors() { @@ -825,9 +682,6 @@ impl LocalBarrierWorker { "remove actors {:?} from senders", actors ); - for actor in actors { - self.barrier_senders.remove(actor); - } } Ok(()) } @@ -862,7 +716,12 @@ impl LocalBarrierWorker { /// When a actor exit unexpectedly, the error is reported using this function. The control stream /// will be reset and the meta service will then trigger recovery. - async fn notify_actor_failure(&mut self, actor_id: ActorId, err: StreamError) { + async fn notify_actor_failure( + &mut self, + actor_id: ActorId, + err: StreamError, + err_context: &'static str, + ) { self.add_failure(actor_id, err.clone()); let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one @@ -871,7 +730,7 @@ impl LocalBarrierWorker { { self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) - .context("failed to collect barrier") + .context(err_context) .to_status_unnamed(Code::Internal), ); } @@ -1033,6 +892,15 @@ impl LocalBarrierManager { }); rx } + + pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + self.send_event(LocalBarrierEvent::RegisterBarrierSender { + actor_id, + barrier_sender: tx, + }); + rx + } } /// A [`StreamError`] with a score, used to find the root cause of actor failures. @@ -1170,6 +1038,7 @@ pub(crate) mod barrier_test_utils { pub(crate) struct LocalBarrierTestEnv { pub shared_context: Arc, + #[expect(dead_code)] pub(super) actor_op_tx: EventSender, pub request_tx: UnboundedSender>, pub response_rx: UnboundedReceiver>, @@ -1211,7 +1080,6 @@ pub(crate) mod barrier_test_utils { pub(crate) fn inject_barrier( &self, barrier: &Barrier, - actor_to_send: impl IntoIterator, actor_to_collect: impl IntoIterator, ) { self.request_tx @@ -1220,7 +1088,6 @@ pub(crate) mod barrier_test_utils { InjectBarrierRequest { request_id: "".to_string(), barrier: Some(barrier.to_protobuf()), - actor_ids_to_send: actor_to_send.into_iter().collect(), actor_ids_to_collect: actor_to_collect.into_iter().collect(), table_ids_to_sync: vec![], partial_graph_id: u32::MAX, diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index c23c3da9fb10..2bc7fbf88566 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -38,7 +38,7 @@ use tokio::sync::mpsc; use super::progress::BackfillState; use super::{BarrierCompleteResult, SubscribeMutationItem}; -use crate::error::StreamResult; +use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; use crate::task::{await_tree_key, ActorId, PartialGraphId}; @@ -194,43 +194,47 @@ impl Display for &'_ PartialGraphManagedBarrierState { enum InflightActorStatus { /// The actor is just spawned and not issued any barrier yet NotStarted, - /// The actor has been issued some barriers, and not issued any stop barrier yet + /// The actor has been issued some barriers, but has not collected the first barrier + IssuedFirst(Vec), + /// The actor has been issued some barriers, and has collected the first barrier Running(u64), - /// The actor has been issued a stop barrier - Stopping(u64), } impl InflightActorStatus { - pub(super) fn is_stopping(&self) -> bool { - matches!(self, InflightActorStatus::Stopping(_)) - } - fn max_issued_epoch(&self) -> Option { match self { InflightActorStatus::NotStarted => None, - InflightActorStatus::Running(epoch) | InflightActorStatus::Stopping(epoch) => { - Some(*epoch) + InflightActorStatus::Running(epoch) => Some(*epoch), + InflightActorStatus::IssuedFirst(issued_barriers) => { + Some(issued_barriers.last().expect("non-empty").epoch.prev) } } } } pub(crate) struct InflightActorState { + actor_id: ActorId, pending_subscribers: BTreeMap>>, + barrier_senders: Vec>, /// `prev_epoch` -> partial graph id pub(super) inflight_barriers: BTreeMap, /// `prev_epoch` -> (`mutation`, `curr_epoch`) barrier_mutations: BTreeMap>, u64)>, status: InflightActorStatus, + /// Whether the actor has been issued a stop barrier + is_stopping: bool, } impl InflightActorState { - pub(super) fn not_started() -> Self { + pub(super) fn not_started(actor_id: ActorId) -> Self { Self { + actor_id, pending_subscribers: Default::default(), + barrier_senders: vec![], inflight_barriers: BTreeMap::default(), barrier_mutations: Default::default(), status: InflightActorStatus::NotStarted, + is_stopping: false, } } @@ -258,7 +262,7 @@ impl InflightActorState { partial_graph_id: PartialGraphId, barrier: &Barrier, is_stop: bool, - ) { + ) -> StreamResult<()> { if let Some(max_issued_epoch) = self.status.max_issued_epoch() { assert!(barrier.epoch.prev > max_issued_epoch); } @@ -285,6 +289,16 @@ impl InflightActorState { } } + for barrier_sender in &self.barrier_senders { + barrier_sender.send(barrier.clone()).map_err(|_| { + StreamError::barrier_send( + barrier.clone(), + self.actor_id, + "failed to send to registered sender", + ) + })?; + } + assert!(self .inflight_barriers .insert(barrier.epoch.prev, partial_graph_id) @@ -297,16 +311,24 @@ impl InflightActorState { assert_eq!(curr_epoch, barrier.epoch.curr); } + match &mut self.status { + InflightActorStatus::NotStarted => { + self.status = InflightActorStatus::IssuedFirst(vec![barrier.clone()]); + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + pending_barriers.push(barrier.clone()); + } + InflightActorStatus::Running(prev_epoch) => { + *prev_epoch = barrier.epoch.prev; + } + }; + if is_stop { assert!(self.pending_subscribers.is_empty()); - assert!( - !self.status.is_stopping(), - "stopped actor should not issue barrier" - ); - self.status = InflightActorStatus::Stopping(barrier.epoch.prev); - } else { - self.status = InflightActorStatus::Running(barrier.epoch.prev); + assert!(!self.is_stopping, "stopped actor should not issue barrier"); + self.is_stopping = true; } + Ok(()) } pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) { @@ -315,9 +337,24 @@ impl InflightActorState { assert_eq!(prev_epoch, epoch.prev); let (min_mutation_epoch, _) = self.barrier_mutations.pop_first().expect("should exist"); assert_eq!(min_mutation_epoch, epoch.prev); + match &self.status { + InflightActorStatus::NotStarted => { + unreachable!("should have issued a barrier when collect") + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + assert_eq!( + prev_epoch, + pending_barriers.first().expect("non-empty").epoch.prev + ); + self.status = InflightActorStatus::Running( + pending_barriers.last().expect("non-empty").epoch.prev, + ); + } + InflightActorStatus::Running(_) => {} + } ( prev_partial_graph_id, - self.inflight_barriers.is_empty() && self.status.is_stopping(), + self.inflight_barriers.is_empty() && self.is_stopping, ) } @@ -446,7 +483,7 @@ impl InflightActorState { break; } } - if !self.status.is_stopping() { + if !self.is_stopping { // Only add the subscribers when the actor is not stopped yet. self.pending_subscribers .entry(prev_epoch) @@ -463,7 +500,7 @@ impl InflightActorState { start_prev_epoch ); } else { - assert!(!self.status.is_stopping(), "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); + assert!(!self.is_stopping, "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); } self.pending_subscribers .entry(start_prev_epoch) @@ -471,6 +508,33 @@ impl InflightActorState { .push(tx); } } + + pub(super) fn register_barrier_sender( + &mut self, + tx: mpsc::UnboundedSender, + ) -> StreamResult<()> { + match &self.status { + InflightActorStatus::NotStarted => { + self.barrier_senders.push(tx); + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + for barrier in pending_barriers { + tx.send(barrier.clone()).map_err(|_| { + StreamError::barrier_send( + barrier.clone(), + self.actor_id, + "failed to send pending barriers to newly registered sender", + ) + })?; + } + self.barrier_senders.push(tx); + } + InflightActorStatus::Running(_) => { + unreachable!("should not register barrier sender when entering Running status") + } + } + Ok(()) + } } impl ManagedBarrierState { @@ -482,10 +546,21 @@ impl ManagedBarrierState { ) { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .subscribe_actor_mutation(start_prev_epoch, tx); } + pub(super) fn register_barrier_sender( + &mut self, + actor_id: ActorId, + tx: mpsc::UnboundedSender, + ) -> StreamResult<()> { + self.actor_states + .entry(actor_id) + .or_insert_with(|| InflightActorState::not_started(actor_id)) + .register_barrier_sender(tx) + } + pub(super) fn transform_to_issued( &mut self, barrier: &Barrier, @@ -493,7 +568,7 @@ impl ManagedBarrierState { table_ids: HashSet, partial_graph_id: PartialGraphId, actor_ids_to_pre_sync_barrier: HashSet, - ) { + ) -> StreamResult<()> { let actor_to_stop = barrier.all_stop_actors(); let graph_state = self .graph_states @@ -514,26 +589,27 @@ impl ManagedBarrierState { for actor_id in actor_ids_to_collect { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .issue_barrier( partial_graph_id, barrier, actor_to_stop .map(|actors| actors.contains(&actor_id)) .unwrap_or(false), - ); + )?; } if partial_graph_id.is_global_graph() { for actor_id in actor_ids_to_pre_sync_barrier { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .sync_barrier(barrier); } } else { assert!(actor_ids_to_pre_sync_barrier.is_empty()); } + Ok(()) } pub(super) fn next_completed_epoch( diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 0d1bb159ea5f..d6a8256aebb6 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -19,7 +19,7 @@ use std::task::Poll; use futures::future::join_all; use futures::FutureExt; -use risingwave_common::util::epoch::test_epoch; +use risingwave_common::util::epoch::{test_epoch, EpochExt}; use super::*; use crate::task::barrier_test_utils::LocalBarrierTestEnv; @@ -31,32 +31,28 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &test_env.actor_op_tx; - async move { - let (barrier_tx, barrier_rx) = unbounded_channel(); - actor_op_tx - .send_and_await(move |result_sender| LocalActorOperation::RegisterSenders { - result_sender, - actor_id, - senders: vec![barrier_tx], - }) - .await - .unwrap(); - (actor_id, barrier_rx) - } + let barrier_rx = test_env + .shared_context + .local_barrier_manager + .subscribe_barrier(actor_id); + (actor_id, barrier_rx) }; // Register actors let actor_ids = vec![233, 234, 235]; let count = actor_ids.len(); - let mut rxs = join_all(actor_ids.clone().into_iter().map(register_sender)).await; + let mut rxs = actor_ids + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); // Send a barrier to all actors let curr_epoch = test_epoch(2); let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - test_env.inject_barrier(&barrier, actor_ids.clone(), actor_ids); + test_env.inject_barrier(&barrier, actor_ids); // Collect barriers from actors let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { @@ -94,19 +90,11 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &test_env.actor_op_tx; - async move { - let (barrier_tx, barrier_rx) = unbounded_channel(); - actor_op_tx - .send_and_await(move |result_sender| LocalActorOperation::RegisterSenders { - result_sender, - actor_id, - senders: vec![barrier_tx], - }) - .await - .unwrap(); - (actor_id, barrier_rx) - } + let barrier_rx = test_env + .shared_context + .local_barrier_manager + .subscribe_barrier(actor_id); + (actor_id, barrier_rx) }; let actor_ids_to_send = vec![233, 234, 235]; @@ -119,7 +107,11 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { // Register actors let count = actor_ids_to_send.len(); - let mut rxs = join_all(actor_ids_to_send.clone().into_iter().map(register_sender)).await; + let mut rxs = actor_ids_to_send + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); // Prepare the barrier let curr_epoch = test_epoch(2); @@ -132,7 +124,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let mut mutation_reader = pin!(mutation_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); - test_env.inject_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); + test_env.inject_barrier(&barrier, actor_ids_to_collect); let (epoch, mutation) = mutation_reader.await.unwrap(); assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); @@ -168,3 +160,92 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { Ok(()) } + +#[tokio::test] +async fn test_late_register_barrier_sender() -> StreamResult<()> { + let mut test_env = LocalBarrierTestEnv::for_test().await; + + let manager = &test_env.shared_context.local_barrier_manager; + + let register_sender = |actor_id: u32| { + let barrier_rx = test_env + .shared_context + .local_barrier_manager + .subscribe_barrier(actor_id); + (actor_id, barrier_rx) + }; + + let actor_ids_to_send = vec![233, 234, 235]; + let extra_actor_id = 666; + let actor_ids_to_collect = actor_ids_to_send + .iter() + .cloned() + .chain(once(extra_actor_id)) + .collect_vec(); + + // Register actors + let count = actor_ids_to_send.len(); + + // Prepare the barrier + let epoch1 = test_epoch(2); + let barrier1 = Barrier::new_test_barrier(epoch1); + + let epoch2 = epoch1.next_epoch(); + let barrier2 = Barrier::new_test_barrier(epoch2).with_stop(); + + test_env.inject_barrier(&barrier1, actor_ids_to_collect.clone()); + test_env.inject_barrier(&barrier2, actor_ids_to_collect.clone()); + + // register sender after inject barrier + let mut rxs = actor_ids_to_send + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); + + // Collect barriers from actors + let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { + let barrier1 = rx.recv().await.unwrap(); + assert_eq!(barrier1.epoch.curr, epoch1); + let barrier2 = rx.recv().await.unwrap(); + assert_eq!(barrier2.epoch.curr, epoch2); + manager.collect(*actor_id, &barrier1); + (*actor_id, barrier2) + })) + .await; + + // Collect a barrier before sending + manager.collect(extra_actor_id, &barrier1); + + let resp = test_env.response_rx.recv().await.unwrap().unwrap(); + match resp.response.unwrap() { + streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { + assert_eq!(complete_barrier.epoch, barrier1.epoch.prev); + } + _ => unreachable!(), + } + + manager.collect(extra_actor_id, &barrier2); + + let mut await_epoch_future = pin!(test_env.response_rx.recv().map(|result| { + let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); + let resp = resp.response.unwrap(); + match resp { + streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { + assert_eq!(complete_barrier.epoch, barrier2.epoch.prev); + } + _ => unreachable!(), + } + })); + + // Report to local barrier manager + for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { + manager.collect(actor_id, &barrier); + manager.flush_all_events().await; + let notified = + poll_fn(|cx| Poll::Ready(await_epoch_future.as_mut().poll(cx).is_ready())).await; + assert_eq!(notified, i == count - 1); + } + + Ok(()) +} diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index dac104156f15..a083cc3974da 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -15,7 +15,6 @@ use core::time::Duration; use std::collections::HashSet; use std::fmt::Debug; -use std::mem::take; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; @@ -24,7 +23,7 @@ use anyhow::anyhow; use async_recursion::async_recursion; use await_tree::InstrumentAwait; use futures::stream::BoxStream; -use futures::FutureExt; +use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; @@ -41,7 +40,6 @@ use risingwave_pb::stream_service::{ }; use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; -use rw_futures_util::AttachedFuture; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; @@ -59,11 +57,11 @@ use crate::executor::{ }; use crate::from_proto::create_executor; use crate::task::barrier_manager::{ - ControlStreamHandle, CreateActorOutput, EventSender, LocalActorOperation, LocalBarrierWorker, + ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, }; use crate::task::{ - ActorId, CreateActorContext, FragmentId, LocalBarrierManager, SharedContext, - StreamActorManager, StreamActorManagerState, StreamEnvironment, UpDownActorIds, + ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, + StreamActorManagerState, StreamEnvironment, UpDownActorIds, }; #[cfg(test)] @@ -151,8 +149,6 @@ pub struct ExecutorParams { pub shared_context: Arc, pub local_barrier_manager: LocalBarrierManager, - - pub create_actor_context: CreateActorContext, } impl Debug for ExecutorParams { @@ -309,15 +305,6 @@ impl LocalBarrierWorker { let result = handle.await; assert!(result.is_ok() || result.unwrap_err().is_cancelled()); } - // Clear the join handle of creating actors - for handle in take(&mut self.actor_manager_state.creating_actors) - .into_iter() - .map(|attached_future| attached_future.into_inner().0) - { - handle.abort(); - let result = handle.await; - assert!(result.is_ok() || result.err().unwrap().is_cancelled()); - } self.actor_manager_state.clear_state(); if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.clear(); @@ -362,19 +349,8 @@ impl LocalBarrierWorker { } } }; - let actor_ids = actors - .iter() - .map(|actor| actor.actor.as_ref().unwrap().actor_id) - .collect(); - let actor_manager = self.actor_manager.clone(); - let create_actors_fut = crate::CONFIG.scope( - self.actor_manager.env.config().clone(), - actor_manager.create_actors(actors, self.current_shared_context.clone()), - ); - let join_handle = self.actor_manager.runtime.spawn(create_actors_fut); - self.actor_manager_state - .creating_actors - .push(AttachedFuture::new(join_handle, (actor_ids, result_sender))); + self.spawn_actors(actors); + let _ = result_sender.send(Ok(())); } } @@ -419,7 +395,6 @@ impl StreamActorManager { has_stateful: bool, subtasks: &mut Vec, shared_context: &Arc, - create_actor_context: &CreateActorContext, ) -> StreamResult { // The "stateful" here means that the executor may issue read operations to the state store // massively and continuously. Used to decide whether to apply the optimization of subtasks. @@ -453,7 +428,6 @@ impl StreamActorManager { has_stateful || is_stateful, subtasks, shared_context, - create_actor_context, ) .await?, ); @@ -501,7 +475,6 @@ impl StreamActorManager { watermark_epoch: self.watermark_epoch.clone(), shared_context: shared_context.clone(), local_barrier_manager: shared_context.local_barrier_manager.clone(), - create_actor_context: create_actor_context.clone(), }; let executor = create_executor(executor_params, node, store).await?; @@ -531,7 +504,6 @@ impl StreamActorManager { } /// Create a chain(tree) of nodes and return the head executor. - #[expect(clippy::too_many_arguments)] async fn create_nodes( &self, fragment_id: FragmentId, @@ -540,7 +512,6 @@ impl StreamActorManager { actor_context: &ActorContextRef, vnode_bitmap: Option, shared_context: &Arc, - create_actor_context: &CreateActorContext, ) -> StreamResult<(Executor, Vec)> { let mut subtasks = vec![]; @@ -555,7 +526,6 @@ impl StreamActorManager { false, &mut subtasks, shared_context, - create_actor_context, ) .await })?; @@ -563,14 +533,12 @@ impl StreamActorManager { Ok((executor, subtasks)) } - async fn create_actors( + async fn create_actor( self: Arc, - actors: Vec, + actor: BuildActorInfo, shared_context: Arc, - ) -> StreamResult { - let mut ret = Vec::with_capacity(actors.len()); - let create_actor_context = CreateActorContext::default(); - for actor in actors { + ) -> StreamResult> { + { let BuildActorInfo { actor, related_subscriptions, @@ -606,7 +574,6 @@ impl StreamActorManager { &actor_context, vnode_bitmap, &shared_context, - &create_actor_context, ) // If hummock tracing is not enabled, it directly returns wrapped future. .may_trace_hummock() @@ -628,27 +595,23 @@ impl StreamActorManager { expr_context, shared_context.local_barrier_manager.clone(), ); - - ret.push(actor); + Ok(actor) } - Ok(CreateActorOutput { - actors: ret, - senders: create_actor_context.collect_senders(), - }) } } impl LocalBarrierWorker { - pub(super) fn spawn_actors(&mut self, actors: Vec>) { + pub(super) fn spawn_actors(&mut self, actors: Vec) { for actor in actors { let monitor = tokio_metrics::TaskMonitor::new(); - let actor_context = actor.actor_context.clone(); - let actor_id = actor_context.id; - + let stream_actor_ref = actor.actor.as_ref().unwrap(); + let actor_id = stream_actor_ref.actor_id; let handle = { - let trace_span = format!("Actor {actor_id}: `{}`", actor_context.mview_definition); + let trace_span = + format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition); let barrier_manager = self.current_shared_context.local_barrier_manager.clone(); - let actor = actor.run().map(move |result| { + // wrap the future of `create_actor` with `boxed` to avoid stack overflow + let actor = self.actor_manager.clone().create_actor(actor, self.current_shared_context.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { if let Err(err) = result { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace.