diff --git a/proto/stream_service.proto b/proto/stream_service.proto index ab9a70a55c1a..5b213e8f26e1 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -55,7 +55,6 @@ message DropActorsResponse { message InjectBarrierRequest { string request_id = 1; stream_plan.Barrier barrier = 2; - repeated uint32 actor_ids_to_send = 3; repeated uint32 actor_ids_to_collect = 4; repeated uint32 table_ids_to_sync = 5; uint32 partial_graph_id = 6; @@ -145,4 +144,4 @@ service StreamService { rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); } -// TODO: Lifecycle management for actors. +// TODO: Lifecycle management for actors. \ No newline at end of file diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 914b9d207bfd..06a390380abf 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -259,24 +259,6 @@ impl InflightGraphInfo { }) } - /// Returns actor list to send in the target worker node. - pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator + '_ { - self.fragment_infos - .values() - .filter(|info| info.is_injectable) - .flat_map(move |info| { - info.actors - .iter() - .filter_map(move |(actor_id, actor_node_id)| { - if *actor_node_id == node_id { - Some(*actor_id) - } else { - None - } - }) - }) - } - pub fn existing_table_ids(&self) -> impl Iterator + '_ { self.fragment_infos .values() diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index f091a09bd522..1b4ab6207db9 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -302,15 +302,9 @@ impl ControlStreamManager { self.nodes .iter_mut() .map(|(node_id, node)| { - let actor_ids_to_send: Vec<_> = - pre_applied_graph_info.actor_ids_to_send(*node_id).collect(); let actor_ids_to_collect: Vec<_> = pre_applied_graph_info .actor_ids_to_collect(*node_id) .collect(); - if actor_ids_to_collect.is_empty() { - // No need to send or collect barrier for this node. - assert!(actor_ids_to_send.is_empty()); - } let table_ids_to_sync = if let Some(graph_info) = applied_graph_info { graph_info .existing_table_ids() @@ -340,7 +334,6 @@ impl ControlStreamManager { InjectBarrierRequest { request_id: StreamRpcManager::new_request_id(), barrier: Some(barrier), - actor_ids_to_send, actor_ids_to_collect, table_ids_to_sync, partial_graph_id, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 9f452dc1863b..39a189616531 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1271,7 +1271,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); tx.send(Message::Barrier(b1.clone().into_dispatcher())) .await .unwrap(); @@ -1291,7 +1291,7 @@ mod tests { // 6. Send another barrier. let b2 = Barrier::new_test_barrier(test_epoch(2)); - barrier_test_env.inject_barrier(&b2, [], [actor_id]); + barrier_test_env.inject_barrier(&b2, [actor_id]); tx.send(Message::Barrier(b2.into_dispatcher())) .await .unwrap(); @@ -1330,7 +1330,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b3, [], [actor_id]); + barrier_test_env.inject_barrier(&b3, [actor_id]); tx.send(Message::Barrier(b3.into_dispatcher())) .await .unwrap(); @@ -1344,7 +1344,7 @@ mod tests { // 11. Send another barrier. let b4 = Barrier::new_test_barrier(test_epoch(4)); - barrier_test_env.inject_barrier(&b4, [], [actor_id]); + barrier_test_env.inject_barrier(&b4, [actor_id]); tx.send(Message::Barrier(b4.into_dispatcher())) .await .unwrap(); diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 0b7415adac38..b03189e932a8 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -228,7 +228,7 @@ async fn test_merger_sum_aggr() { let mut epoch = test_epoch(1); let b1 = Barrier::new_test_barrier(epoch); - barrier_test_env.inject_barrier(&b1, [], actors.clone()); + barrier_test_env.inject_barrier(&b1, actors.clone()); input .send(Message::Barrier(b1.into_dispatcher())) .await @@ -244,7 +244,7 @@ async fn test_merger_sum_aggr() { input.send(Message::Chunk(chunk)).await.unwrap(); } let b = Barrier::new_test_barrier(epoch); - barrier_test_env.inject_barrier(&b, [], actors.clone()); + barrier_test_env.inject_barrier(&b, actors.clone()); input .send(Message::Barrier(b.into_dispatcher())) .await @@ -253,7 +253,7 @@ async fn test_merger_sum_aggr() { } let b = Barrier::new_test_barrier(epoch) .with_mutation(Mutation::Stop(actors.clone().into_iter().collect())); - barrier_test_env.inject_barrier(&b, [], actors); + barrier_test_env.inject_barrier(&b, actors); input .send(Message::Barrier(b.into_dispatcher())) .await diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 8254dfeadc68..b09351223c00 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -524,13 +524,13 @@ mod tests { .map(|(_, epoch)| { let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch); *prev_epoch = *epoch; - barrier_test_env.inject_barrier(&barrier, [], [actor_id]); + barrier_test_env.inject_barrier(&barrier, [actor_id]); (*epoch, barrier) }) .collect(); let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch) .with_mutation(Mutation::Stop(HashSet::default())); - barrier_test_env.inject_barrier(&b2, [], [actor_id]); + barrier_test_env.inject_barrier(&b2, [actor_id]); for (tx_id, tx) in txs.into_iter().enumerate() { let epochs = epochs.clone(); @@ -703,7 +703,7 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); send!( [untouched, old], Message::Barrier(b1.clone().into_dispatcher()) @@ -822,7 +822,6 @@ mod tests { test_env.inject_barrier( &exchange_client_test_barrier(), - [], [remote_input.actor_id()], ); diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 58700d2a1350..6cabb7938833 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -319,7 +319,7 @@ mod tests { }, )); - barrier_test_env.inject_barrier(&b1, [], [actor_id]); + barrier_test_env.inject_barrier(&b1, [actor_id]); send!([new], Message::Barrier(b1.clone().into_dispatcher())); assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream. diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index fddc9c4fd253..ce38cb6a49e8 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -35,7 +35,8 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); params - .create_actor_context + .shared_context + .local_barrier_manager .register_sender(params.actor_context.id, sender); let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver); diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index 9eac7caa1355..6b79eb81e6c8 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -38,7 +38,8 @@ impl ExecutorBuilder for NowExecutorBuilder { ) -> StreamResult { let (sender, barrier_receiver) = unbounded_channel(); params - .create_actor_context + .shared_context + .local_barrier_manager .register_sender(params.actor_context.id, sender); let mode = if let Ok(pb_mode) = node.get_mode() { diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 39e3293933e4..919cf37e70e3 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -143,7 +143,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { ) -> StreamResult { let (sender, barrier_receiver) = unbounded_channel(); params - .create_actor_context + .shared_context + .local_barrier_manager .register_sender(params.actor_context.id, sender); let system_params = params.env.system_params_manager_ref().get_params(); diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 33badbb01288..3a3811421a45 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -156,7 +156,8 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { let vnodes = params.vnode_bitmap.map(Arc::new); let (barrier_tx, barrier_rx) = mpsc::unbounded_channel(); params - .create_actor_context + .shared_context + .local_barrier_manager .register_sender(params.actor_context.id, barrier_tx); let upstream_table = diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 96cf002488e8..42fae84d8024 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -37,7 +37,8 @@ impl ExecutorBuilder for ValuesExecutorBuilder { ) -> StreamResult { let (sender, barrier_receiver) = unbounded_channel(); params - .create_actor_context + .shared_context + .local_barrier_manager .register_sender(params.actor_context.id, sender); let progress = params .local_barrier_manager diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 950974b8acb5..ca37707c7728 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -22,7 +22,6 @@ use anyhow::{anyhow, Context}; use futures::stream::{BoxStream, FuturesUnordered}; use futures::StreamExt; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::error::tonic::extra::Score; use risingwave_pb::stream_service::barrier_complete_response::{ GroupedSstableInfo, PbCreateMviewProgress, @@ -185,38 +184,8 @@ impl ControlStreamHandle { } } -#[derive(Clone)] -pub struct CreateActorContext { - #[expect(clippy::type_complexity)] - barrier_sender: Arc>>>>>, -} - -impl Default for CreateActorContext { - fn default() -> Self { - Self { - barrier_sender: Arc::new(Mutex::new(Some(HashMap::new()))), - } - } -} - -impl CreateActorContext { - pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { - self.barrier_sender - .lock() - .as_mut() - .expect("should not register after collecting sender") - .entry(actor_id) - .or_default() - .push(sender) - } - - pub(super) fn collect_senders(&self) -> HashMap>> { - self.barrier_sender - .lock() - .take() - .expect("should not collect senders for twice") - } -} +#[derive(Clone, Default)] +pub struct CreateActorContext {} pub(crate) type SubscribeMutationItem = (u64, Option>); @@ -235,6 +204,10 @@ pub(super) enum LocalBarrierEvent { epoch: EpochPair, mutation_sender: mpsc::UnboundedSender, }, + RegisterBarrierSender { + actor_id: ActorId, + barrier_sender: mpsc::UnboundedSender, + }, #[cfg(test)] Flush(oneshot::Sender<()>), } @@ -267,12 +240,6 @@ pub(super) enum LocalActorOperation { }, #[cfg(test)] GetCurrentSharedContext(oneshot::Sender>), - #[cfg(test)] - RegisterSenders { - actor_id: ActorId, - senders: Vec>, - result_sender: oneshot::Sender<()>, - }, InspectState { result_sender: oneshot::Sender, }, @@ -283,7 +250,6 @@ pub(super) enum LocalActorOperation { pub(super) struct CreateActorOutput { pub(super) actors: Vec>, - pub(super) senders: HashMap>>, } pub(crate) struct StreamActorManagerState { @@ -346,7 +312,6 @@ pub(crate) struct StreamActorManager { } pub(super) struct LocalBarrierWorkerDebugInfo<'a> { - actor_to_send: BTreeSet, running_actors: BTreeSet, creating_actors: Vec>, managed_barrier_state: ManagedBarrierStateDebugInfo<'a>, @@ -360,11 +325,6 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { write!(f, "{}, ", actor_id)?; } - write!(f, "\nactor_to_send: ")?; - for actor_id in &self.actor_to_send { - write!(f, "{}, ", actor_id)?; - } - write!(f, "\ncreating_actors: ")?; for actors in &self.creating_actors { for actor_id in actors { @@ -387,9 +347,6 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. pub(super) struct LocalBarrierWorker { - /// Stores all streaming job source sender. - barrier_senders: HashMap>>, - /// Current barrier collection state. state: ManagedBarrierState, @@ -424,7 +381,6 @@ impl LocalBarrierWorker { }, )); Self { - barrier_senders: HashMap::new(), failure_actors: HashMap::default(), state: ManagedBarrierState::new( actor_manager.env.state_store(), @@ -443,7 +399,6 @@ impl LocalBarrierWorker { fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> { LocalBarrierWorkerDebugInfo { - actor_to_send: self.barrier_senders.keys().cloned().collect(), running_actors: self.actor_manager_state.handles.keys().cloned().collect(), creating_actors: self .actor_manager_state @@ -521,9 +476,6 @@ impl LocalBarrierWorker { create_actor_result: StreamResult, ) { let result = create_actor_result.map(|output| { - for (actor_id, senders) in output.senders { - self.register_sender(actor_id, senders); - } self.spawn_actors(output.actors); }); @@ -539,7 +491,6 @@ impl LocalBarrierWorker { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; self.send_barrier( &barrier, - req.actor_ids_to_send.into_iter().collect(), req.actor_ids_to_collect.into_iter().collect(), req.table_ids_to_sync .into_iter() @@ -584,6 +535,12 @@ impl LocalBarrierWorker { self.state .subscribe_actor_mutation(actor_id, epoch.prev, mutation_sender); } + LocalBarrierEvent::RegisterBarrierSender { + actor_id, + barrier_sender, + } => { + self.state.register_barrier_sender(actor_id, barrier_sender); + } #[cfg(test)] LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), } @@ -625,15 +582,6 @@ impl LocalBarrierWorker { LocalActorOperation::GetCurrentSharedContext(sender) => { let _ = sender.send(self.current_shared_context.clone()); } - #[cfg(test)] - LocalActorOperation::RegisterSenders { - actor_id, - senders, - result_sender, - } => { - self.register_sender(actor_id, senders); - let _ = result_sender.send(()); - } LocalActorOperation::InspectState { result_sender } => { let debug_info = self.to_debug_info(); let _ = result_sender.send(debug_info.to_string()); @@ -713,19 +661,6 @@ impl LocalBarrierWorker { Ok(()) } - /// Register sender for source actors, used to send barriers. - fn register_sender(&mut self, actor_id: ActorId, senders: Vec>) { - tracing::debug!( - target: "events::stream::barrier::manager", - actor_id = actor_id, - "register sender" - ); - self.barrier_senders - .entry(actor_id) - .or_default() - .extend(senders); - } - /// Broadcast a barrier to all senders. Save a receiver which will get notified when this /// barrier is finished, in managed mode. /// @@ -735,7 +670,6 @@ impl LocalBarrierWorker { fn send_barrier( &mut self, barrier: &Barrier, - to_send: HashSet, to_collect: HashSet, table_ids: HashSet, partial_graph_id: PartialGraphId, @@ -767,9 +701,8 @@ impl LocalBarrierWorker { } debug!( target: "events::stream::barrier::manager::send", - "send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}", + "send barrier {:?}, actor_ids_to_collect = {:?}", barrier, - to_send, to_collect ); @@ -792,31 +725,7 @@ impl LocalBarrierWorker { table_ids, partial_graph_id, actor_ids_to_pre_sync_barrier, - ); - - for actor_id in to_send { - match self.barrier_senders.get(&actor_id) { - Some(senders) => { - for sender in senders { - if let Err(_err) = sender.send(barrier.clone()) { - // return err to trigger recovery. - return Err(StreamError::barrier_send( - barrier.clone(), - actor_id, - "channel closed", - )); - } - } - } - None => { - return Err(StreamError::barrier_send( - barrier.clone(), - actor_id, - "sender not found", - )); - } - } - } + )?; // Actors to stop should still accept this barrier, but won't get sent to in next times. if let Some(actors) = barrier.all_stop_actors() { @@ -825,9 +734,6 @@ impl LocalBarrierWorker { "remove actors {:?} from senders", actors ); - for actor in actors { - self.barrier_senders.remove(actor); - } } Ok(()) } @@ -1033,6 +939,13 @@ impl LocalBarrierManager { }); rx } + + pub fn register_sender(&self, actor_id: ActorId, tx: UnboundedSender) { + self.send_event(LocalBarrierEvent::RegisterBarrierSender { + actor_id, + barrier_sender: tx, + }) + } } /// A [`StreamError`] with a score, used to find the root cause of actor failures. @@ -1170,6 +1083,7 @@ pub(crate) mod barrier_test_utils { pub(crate) struct LocalBarrierTestEnv { pub shared_context: Arc, + #[expect(dead_code)] pub(super) actor_op_tx: EventSender, pub request_tx: UnboundedSender>, pub response_rx: UnboundedReceiver>, @@ -1211,7 +1125,6 @@ pub(crate) mod barrier_test_utils { pub(crate) fn inject_barrier( &self, barrier: &Barrier, - actor_to_send: impl IntoIterator, actor_to_collect: impl IntoIterator, ) { self.request_tx @@ -1220,7 +1133,6 @@ pub(crate) mod barrier_test_utils { InjectBarrierRequest { request_id: "".to_string(), barrier: Some(barrier.to_protobuf()), - actor_ids_to_send: actor_to_send.into_iter().collect(), actor_ids_to_collect: actor_to_collect.into_iter().collect(), table_ids_to_sync: vec![], partial_graph_id: u32::MAX, diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index c23c3da9fb10..de3ca4c54dd2 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -38,7 +38,7 @@ use tokio::sync::mpsc; use super::progress::BackfillState; use super::{BarrierCompleteResult, SubscribeMutationItem}; -use crate::error::StreamResult; +use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; use crate::task::{await_tree_key, ActorId, PartialGraphId}; @@ -194,43 +194,47 @@ impl Display for &'_ PartialGraphManagedBarrierState { enum InflightActorStatus { /// The actor is just spawned and not issued any barrier yet NotStarted, - /// The actor has been issued some barriers, and not issued any stop barrier yet + /// The actor has been issued some barriers, but has not collected the first barrier + IssuedFirst(Vec), + /// The actor has been issued some barriers, and has collected the first barrier Running(u64), - /// The actor has been issued a stop barrier - Stopping(u64), } impl InflightActorStatus { - pub(super) fn is_stopping(&self) -> bool { - matches!(self, InflightActorStatus::Stopping(_)) - } - fn max_issued_epoch(&self) -> Option { match self { InflightActorStatus::NotStarted => None, - InflightActorStatus::Running(epoch) | InflightActorStatus::Stopping(epoch) => { - Some(*epoch) + InflightActorStatus::Running(epoch) => Some(*epoch), + InflightActorStatus::IssuedFirst(issued_barriers) => { + Some(issued_barriers.last().expect("non-empty").epoch.prev) } } } } pub(crate) struct InflightActorState { + actor_id: ActorId, pending_subscribers: BTreeMap>>, + barrier_senders: Vec>, /// `prev_epoch` -> partial graph id pub(super) inflight_barriers: BTreeMap, /// `prev_epoch` -> (`mutation`, `curr_epoch`) barrier_mutations: BTreeMap>, u64)>, status: InflightActorStatus, + /// Whether the actor has been issued a stop barrier + is_stopping: bool, } impl InflightActorState { - pub(super) fn not_started() -> Self { + pub(super) fn not_started(actor_id: ActorId) -> Self { Self { + actor_id, pending_subscribers: Default::default(), + barrier_senders: vec![], inflight_barriers: BTreeMap::default(), barrier_mutations: Default::default(), status: InflightActorStatus::NotStarted, + is_stopping: false, } } @@ -258,7 +262,7 @@ impl InflightActorState { partial_graph_id: PartialGraphId, barrier: &Barrier, is_stop: bool, - ) { + ) -> StreamResult<()> { if let Some(max_issued_epoch) = self.status.max_issued_epoch() { assert!(barrier.epoch.prev > max_issued_epoch); } @@ -285,6 +289,16 @@ impl InflightActorState { } } + for barrier_sender in &self.barrier_senders { + barrier_sender.send(barrier.clone()).map_err(|_| { + StreamError::barrier_send( + barrier.clone(), + self.actor_id, + "failed to send to registered sender", + ) + })?; + } + assert!(self .inflight_barriers .insert(barrier.epoch.prev, partial_graph_id) @@ -297,16 +311,24 @@ impl InflightActorState { assert_eq!(curr_epoch, barrier.epoch.curr); } + match &mut self.status { + InflightActorStatus::NotStarted => { + self.status = InflightActorStatus::IssuedFirst(vec![barrier.clone()]); + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + pending_barriers.push(barrier.clone()); + } + InflightActorStatus::Running(prev_epoch) => { + *prev_epoch = barrier.epoch.prev; + } + }; + if is_stop { assert!(self.pending_subscribers.is_empty()); - assert!( - !self.status.is_stopping(), - "stopped actor should not issue barrier" - ); - self.status = InflightActorStatus::Stopping(barrier.epoch.prev); - } else { - self.status = InflightActorStatus::Running(barrier.epoch.prev); + assert!(!self.is_stopping, "stopped actor should not issue barrier"); + self.is_stopping = true; } + Ok(()) } pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) { @@ -315,9 +337,24 @@ impl InflightActorState { assert_eq!(prev_epoch, epoch.prev); let (min_mutation_epoch, _) = self.barrier_mutations.pop_first().expect("should exist"); assert_eq!(min_mutation_epoch, epoch.prev); + match &self.status { + InflightActorStatus::NotStarted => { + unreachable!("should have issued a barrier when collect") + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + assert_eq!( + prev_epoch, + pending_barriers.first().expect("non-empty").epoch.prev + ); + self.status = InflightActorStatus::Running( + pending_barriers.last().expect("non-empty").epoch.prev, + ); + } + InflightActorStatus::Running(_) => {} + } ( prev_partial_graph_id, - self.inflight_barriers.is_empty() && self.status.is_stopping(), + self.inflight_barriers.is_empty() && self.is_stopping, ) } @@ -446,7 +483,7 @@ impl InflightActorState { break; } } - if !self.status.is_stopping() { + if !self.is_stopping { // Only add the subscribers when the actor is not stopped yet. self.pending_subscribers .entry(prev_epoch) @@ -463,7 +500,7 @@ impl InflightActorState { start_prev_epoch ); } else { - assert!(!self.status.is_stopping(), "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); + assert!(!self.is_stopping, "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); } self.pending_subscribers .entry(start_prev_epoch) @@ -471,6 +508,25 @@ impl InflightActorState { .push(tx); } } + + pub(super) fn register_barrier_sender(&mut self, tx: mpsc::UnboundedSender) { + match &self.status { + InflightActorStatus::NotStarted => { + self.barrier_senders.push(tx); + } + InflightActorStatus::IssuedFirst(pending_barriers) => { + for barrier in pending_barriers { + // ignore the send err and register the tx anyway. + // failed tx will trigger failure when handling the `InjectBarrier`. + let _ = tx.send(barrier.clone()); + } + self.barrier_senders.push(tx); + } + InflightActorStatus::Running(_) => { + unreachable!("should not register barrier sender when entering Running status") + } + } + } } impl ManagedBarrierState { @@ -482,10 +538,21 @@ impl ManagedBarrierState { ) { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .subscribe_actor_mutation(start_prev_epoch, tx); } + pub(super) fn register_barrier_sender( + &mut self, + actor_id: ActorId, + tx: mpsc::UnboundedSender, + ) { + self.actor_states + .entry(actor_id) + .or_insert_with(|| InflightActorState::not_started(actor_id)) + .register_barrier_sender(tx); + } + pub(super) fn transform_to_issued( &mut self, barrier: &Barrier, @@ -493,7 +560,7 @@ impl ManagedBarrierState { table_ids: HashSet, partial_graph_id: PartialGraphId, actor_ids_to_pre_sync_barrier: HashSet, - ) { + ) -> StreamResult<()> { let actor_to_stop = barrier.all_stop_actors(); let graph_state = self .graph_states @@ -514,26 +581,27 @@ impl ManagedBarrierState { for actor_id in actor_ids_to_collect { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .issue_barrier( partial_graph_id, barrier, actor_to_stop .map(|actors| actors.contains(&actor_id)) .unwrap_or(false), - ); + )?; } if partial_graph_id.is_global_graph() { for actor_id in actor_ids_to_pre_sync_barrier { self.actor_states .entry(actor_id) - .or_insert_with(InflightActorState::not_started) + .or_insert_with(|| InflightActorState::not_started(actor_id)) .sync_barrier(barrier); } } else { assert!(actor_ids_to_pre_sync_barrier.is_empty()); } + Ok(()) } pub(super) fn next_completed_epoch( diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 0d1bb159ea5f..6d5a5cc0e666 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -31,32 +31,29 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &test_env.actor_op_tx; - async move { - let (barrier_tx, barrier_rx) = unbounded_channel(); - actor_op_tx - .send_and_await(move |result_sender| LocalActorOperation::RegisterSenders { - result_sender, - actor_id, - senders: vec![barrier_tx], - }) - .await - .unwrap(); - (actor_id, barrier_rx) - } + let (barrier_tx, barrier_rx) = unbounded_channel(); + test_env + .shared_context + .local_barrier_manager + .register_sender(actor_id, barrier_tx); + (actor_id, barrier_rx) }; // Register actors let actor_ids = vec![233, 234, 235]; let count = actor_ids.len(); - let mut rxs = join_all(actor_ids.clone().into_iter().map(register_sender)).await; + let mut rxs = actor_ids + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); // Send a barrier to all actors let curr_epoch = test_epoch(2); let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - test_env.inject_barrier(&barrier, actor_ids.clone(), actor_ids); + test_env.inject_barrier(&barrier, actor_ids); // Collect barriers from actors let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { @@ -94,19 +91,12 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &test_env.actor_op_tx; - async move { - let (barrier_tx, barrier_rx) = unbounded_channel(); - actor_op_tx - .send_and_await(move |result_sender| LocalActorOperation::RegisterSenders { - result_sender, - actor_id, - senders: vec![barrier_tx], - }) - .await - .unwrap(); - (actor_id, barrier_rx) - } + let (barrier_tx, barrier_rx) = unbounded_channel(); + test_env + .shared_context + .local_barrier_manager + .register_sender(actor_id, barrier_tx); + (actor_id, barrier_rx) }; let actor_ids_to_send = vec![233, 234, 235]; @@ -119,7 +109,11 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { // Register actors let count = actor_ids_to_send.len(); - let mut rxs = join_all(actor_ids_to_send.clone().into_iter().map(register_sender)).await; + let mut rxs = actor_ids_to_send + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); // Prepare the barrier let curr_epoch = test_epoch(2); @@ -132,7 +126,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let mut mutation_reader = pin!(mutation_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); - test_env.inject_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); + test_env.inject_barrier(&barrier, actor_ids_to_collect); let (epoch, mutation) = mutation_reader.await.unwrap(); assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a5b2d48d57c..c08eb931ae4e 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -628,10 +628,7 @@ impl StreamActorManager { ret.push(actor); } - Ok(CreateActorOutput { - actors: ret, - senders: create_actor_context.collect_senders(), - }) + Ok(CreateActorOutput { actors: ret }) } }