diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 100ad78dffbd9..c5a63f97cf1c9 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -109,9 +109,14 @@ message StreamingControlStreamRequest { uint64 prev_epoch = 2; } + message RemovePartialGraphRequest { + repeated uint32 partial_graph_ids = 1; + } + oneof request { InitRequest init = 1; InjectBarrierRequest inject_barrier = 2; + RemovePartialGraphRequest remove_partial_graph = 3; } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index cbf61a6cc7316..afd9fde5582a6 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -964,6 +964,7 @@ mod tests { ), })); } + streaming_control_stream_request::Request::RemovePartialGraph(..) => {} } } }); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 1d430ce5968db..86d2effa06e9d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -495,6 +495,12 @@ impl LocalBarrierWorker { self.send_barrier(barrier, req.graph_info)?; Ok(()) } + Request::RemovePartialGraph(req) => { + self.remove_partial_graphs( + req.partial_graph_ids.into_iter().map(PartialGraphId::new), + ); + Ok(()) + } Request::Init(_) => { unreachable!() } @@ -767,6 +773,23 @@ impl LocalBarrierWorker { Ok(()) } + fn remove_partial_graphs(&mut self, partial_graph_ids: impl Iterator) { + for partial_graph_id in partial_graph_ids { + if let Some(graph) = self.state.graph_states.remove(&partial_graph_id) { + assert!( + graph.is_empty(), + "non empty graph to be removed: {}", + &graph + ); + } else { + warn!( + partial_graph_id = partial_graph_id.0, + "no partial graph to remove" + ); + } + } + } + /// Reset all internal states. pub(super) fn reset_state(&mut self) { *self = Self::new(self.actor_manager.clone()); @@ -785,8 +808,7 @@ impl LocalBarrierWorker { let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one if let Some(actor_state) = self.state.actor_states.get(&actor_id) - && let Some(inflight_barriers) = actor_state.inflight_barriers() - && !inflight_barriers.is_empty() + && !actor_state.inflight_barriers.is_empty() { self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 91c46d8559a54..88dae36007612 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -42,7 +42,6 @@ use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::barrier_manager::managed_state::actor_status::InflightActorState; use crate::task::{await_tree_key, ActorId, PartialGraphId}; struct IssuedState { @@ -193,238 +192,73 @@ impl Display for &'_ PartialGraphManagedBarrierState { } } -mod actor_status { - use std::collections::BTreeMap; - use std::sync::Arc; - - use risingwave_common::must_match; - use risingwave_common::util::epoch::EpochPair; - use tokio::sync::mpsc; - - use crate::executor::{Barrier, Mutation}; - use crate::task::{PartialGraphId, SubscribeMutationItem}; - - enum InflightActorStatus { - /// The actor has not been issued any barrier yet - NotStarted, - /// The actor has been issued some barriers, but has collected all the barrier. - /// Waiting for new barrier to issue. - Pending { - /// The latest `partial_graph_id` before entering `Pending` status. - /// The actor should be in the `inflight_actors` of the graph. - prev_partial_graph_id: PartialGraphId, - /// The `prev_epoch` of the previous barrier - prev_epoch: u64, - }, - /// The actor has been issued with some barriers, and waiting for collecting some barriers. - Running { - /// `prev_epoch` -> partial graph id - /// Store the barriers that has been issued but not collected. - /// Must be non-empty when in this variant, or transit to `Pending`, or the states gets removed when stopped. - /// - /// The actor should be in the `inflight_actors` of graph whose `partial_graph_id` of the first graph id. - inflight_barriers: BTreeMap>)>, - /// Whether the actor has been issued a stop barrier - is_stopping: bool, - }, - } - - pub(crate) struct InflightActorState { - pending_subscribers: BTreeMap>>, - started_subscribers: Vec>, - status: InflightActorStatus, - } - - impl InflightActorState { - pub(super) fn not_started() -> Self { - Self { - pending_subscribers: Default::default(), - started_subscribers: vec![], - status: InflightActorStatus::NotStarted, - } - } +pub(crate) struct InflightActorState { + pending_subscribers: BTreeMap>>, + started_subscribers: Vec>, + /// `prev_epoch` -> partial graph id + pub(super) inflight_barriers: BTreeMap>)>, + /// Whether the actor has been issued a stop barrier + is_stopping: bool, +} - #[expect(clippy::type_complexity)] - pub(crate) fn inflight_barriers( - &self, - ) -> Option<&BTreeMap>)>> { - if let InflightActorStatus::Running { - inflight_barriers, .. - } = &self.status - { - Some(inflight_barriers) - } else { - None - } +impl InflightActorState { + pub(super) fn not_started() -> Self { + Self { + pending_subscribers: Default::default(), + started_subscribers: vec![], + inflight_barriers: BTreeMap::default(), + is_stopping: false, } + } - pub(super) fn subscribe_actor_mutation( - &mut self, - start_prev_epoch: u64, - tx: mpsc::UnboundedSender, - ) { - match &self.status { - InflightActorStatus::NotStarted => { - self.pending_subscribers - .entry(start_prev_epoch) - .or_default() - .push(tx); - } - InflightActorStatus::Pending { prev_epoch, .. } => { - assert!(*prev_epoch < start_prev_epoch); + pub(super) fn issue_barrier( + &mut self, + partial_graph_id: PartialGraphId, + barrier: &Barrier, + is_stop: bool, + ) { + assert!(!self.is_stopping, "stopped actor should not issue barrier"); + if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { + assert!( + *first_epoch >= barrier.epoch.prev, + "barrier epoch {:?} skip subscribed epoch {}", + barrier.epoch, + first_epoch + ); + if *first_epoch == barrier.epoch.prev { + self.started_subscribers.extend( self.pending_subscribers - .entry(start_prev_epoch) - .or_default() - .push(tx); - } - InflightActorStatus::Running { - inflight_barriers, - is_stopping, - .. - } => { - if inflight_barriers.contains_key(&start_prev_epoch) { - for (prev_epoch, (_, mutation)) in - inflight_barriers.range(start_prev_epoch..) - { - if tx.send((*prev_epoch, mutation.clone())).is_err() { - // No more subscribe on the mutation. Simply return. - return; - } - } - if !*is_stopping { - self.started_subscribers.push(tx); - } - } else { - // Barrier has not issued yet. Store the pending tx - if let Some((last_epoch, _)) = inflight_barriers.last_key_value() { - assert!( - *last_epoch < start_prev_epoch, - "later barrier {} has been issued, but skip the start epoch {:?}", - last_epoch, - start_prev_epoch - ); - } - self.pending_subscribers - .entry(start_prev_epoch) - .or_default() - .push(tx); - } - } - } - } - - #[must_use] - pub(super) fn issue_barrier( - &mut self, - partial_graph_id: PartialGraphId, - barrier: &Barrier, - is_stop: bool, - ) -> ( - // Some(prev_partial_graph_id) when the actor was in status - // InflightActorStatus::Pending { .. } with a different graph id - Option, - // whether the actor is new to the `partial_graph_id` - bool, - ) { - let (prev_partial_graph_id, is_new_in_graph) = match &self.status { - InflightActorStatus::NotStarted => { - self.status = InflightActorStatus::Running { - inflight_barriers: Default::default(), - is_stopping: false, - }; - (None, true) - } - InflightActorStatus::Pending { - prev_partial_graph_id, - prev_epoch, - } => { - assert!(*prev_epoch < barrier.epoch.prev); - let prev_partial_graph_id = *prev_partial_graph_id; - self.status = InflightActorStatus::Running { - inflight_barriers: Default::default(), - is_stopping: false, - }; - if prev_partial_graph_id != partial_graph_id { - (Some(prev_partial_graph_id), true) - } else { - (None, false) - } - } - InflightActorStatus::Running { - inflight_barriers, .. - } => { - let (prev_epoch, _) = inflight_barriers.last_key_value().expect("non-empty"); - assert!(*prev_epoch < barrier.epoch.prev); - (None, false) - } - }; - - if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { - assert!( - *first_epoch >= barrier.epoch.prev, - "barrier epoch {:?} skip subscribed epoch {}", - barrier.epoch, - first_epoch + .pop_first() + .expect("should exist") + .1, ); - if *first_epoch == barrier.epoch.prev { - self.started_subscribers.extend( - self.pending_subscribers - .pop_first() - .expect("should exist") - .1, - ); - } } - self.started_subscribers.retain(|tx| { - tx.send((barrier.epoch.prev, barrier.mutation.clone())) - .is_ok() - }); - - must_match!(&mut self.status, InflightActorStatus::Running { - inflight_barriers, is_stopping, - } => { - inflight_barriers.insert(barrier.epoch.prev, (partial_graph_id, barrier.mutation.clone())); - *is_stopping = is_stop; - }); - - (prev_partial_graph_id, is_new_in_graph) } + self.started_subscribers.retain(|tx| { + tx.send((barrier.epoch.prev, barrier.mutation.clone())) + .is_ok() + }); - #[must_use] - pub(super) fn collect( - &mut self, - epoch: EpochPair, - ) -> ( - // The `partial_graph_id` of actor on the collected epoch - PartialGraphId, - // None => the partial graph id of this actor is not changed - // Some(None) => the actor has stopped, and should be removed from the return `partial_graph_id` - // Some(Some(new_partial_graph_id)) => the actor will move to the `new_partial_graph_id` - Option>, - ) { - must_match!(&mut self.status, InflightActorStatus::Running { - inflight_barriers, is_stopping - } => { - let (prev_epoch, (prev_partial_graph_id, _)) = inflight_barriers.pop_first().expect("should exist"); - assert_eq!(prev_epoch, epoch.prev); - let move_to_graph_id = if let Some((epoch, (graph_id, _))) = inflight_barriers.first_key_value() { - if *graph_id != prev_partial_graph_id { - Some(Some((*graph_id, *epoch))) - } else { - None - } - } else if *is_stopping { - Some(None) - } else { - self.status = InflightActorStatus::Pending {prev_epoch, prev_partial_graph_id}; - // No need to move to any partial graph when transit to `Pending`. When issuing the next barrier and - // the next graph id gets different, the actor will then move to the next graph id - None - }; - (prev_partial_graph_id, move_to_graph_id) - }) + self.inflight_barriers.insert( + barrier.epoch.prev, + (partial_graph_id, barrier.mutation.clone()), + ); + self.is_stopping = is_stop; + if is_stop { + assert!(self.pending_subscribers.is_empty()); + self.started_subscribers.clear(); } } + + pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) { + let (prev_epoch, (prev_partial_graph_id, _)) = + self.inflight_barriers.pop_first().expect("should exist"); + assert_eq!(prev_epoch, epoch.prev); + ( + prev_partial_graph_id, + self.inflight_barriers.is_empty() && self.is_stopping, + ) + } } pub(super) struct PartialGraphManagedBarrierState { @@ -436,20 +270,18 @@ pub(super) struct PartialGraphManagedBarrierState { /// The key is `prev_epoch`, and the first value is `curr_epoch` epoch_barrier_state_map: BTreeMap, - inflight_actors: HashSet, - prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. pub(super) create_mview_progress: HashMap>, - /// Futures will be finished in the order of epoch in ascending order. - await_epoch_completed_futures: FuturesOrdered, - pub(super) state_store: StateStoreImpl, pub(super) streaming_metrics: Arc, + /// Futures will be finished in the order of epoch in ascending order. + await_epoch_completed_futures: FuturesOrdered, + /// Manages the await-trees of all barriers. barrier_await_tree_reg: Option, } @@ -464,7 +296,6 @@ impl PartialGraphManagedBarrierState { Self { need_seal_epoch, epoch_barrier_state_map: Default::default(), - inflight_actors: Default::default(), prev_barrier_table_ids: None, create_mview_progress: Default::default(), await_epoch_completed_futures: Default::default(), @@ -483,6 +314,10 @@ impl PartialGraphManagedBarrierState { None, ) } + + pub(super) fn is_empty(&self) -> bool { + self.epoch_barrier_state_map.is_empty() + } } pub(super) struct ManagedBarrierState { @@ -519,7 +354,43 @@ impl ManagedBarrierState { graph_states: &self.graph_states, } } +} +impl InflightActorState { + pub(super) fn subscribe_actor_mutation( + &mut self, + start_prev_epoch: u64, + tx: mpsc::UnboundedSender, + ) { + if self.inflight_barriers.contains_key(&start_prev_epoch) { + for (prev_epoch, (_, mutation)) in self.inflight_barriers.range(start_prev_epoch..) { + if tx.send((*prev_epoch, mutation.clone())).is_err() { + // No more subscribe on the mutation. Simply return. + return; + } + } + if !self.is_stopping { + self.started_subscribers.push(tx); + } + } else { + // Barrier has not issued yet. Store the pending tx + if let Some((last_epoch, _)) = self.inflight_barriers.last_key_value() { + assert!( + *last_epoch < start_prev_epoch, + "later barrier {} has been issued, but skip the start epoch {:?}", + last_epoch, + start_prev_epoch + ); + } + self.pending_subscribers + .entry(start_prev_epoch) + .or_default() + .push(tx); + } + } +} + +impl ManagedBarrierState { pub(super) fn subscribe_actor_mutation( &mut self, actor_id: ActorId, @@ -538,7 +409,6 @@ impl ManagedBarrierState { graph_infos: &HashMap, ) { let actor_to_stop = barrier.all_stop_actors(); - let mut graph_actors_to_clean: HashMap> = HashMap::new(); for (partial_graph_id, graph_info) in graph_infos { let partial_graph_id = PartialGraphId::new(*partial_graph_id); @@ -565,8 +435,7 @@ impl ManagedBarrierState { ); for actor_id in &graph_info.actor_ids_to_collect { - let (prev_partial_graph_id, is_new_in_graph) = self - .actor_states + self.actor_states .entry(*actor_id) .or_insert_with(InflightActorState::not_started) .issue_barrier( @@ -576,22 +445,6 @@ impl ManagedBarrierState { .map(|actors| actors.contains(actor_id)) .unwrap_or(false), ); - if is_new_in_graph { - graph_state.add_inflight_actor(*actor_id, barrier.epoch.prev); - } - if let Some(prev_partial_graph_id) = prev_partial_graph_id { - graph_actors_to_clean - .entry(prev_partial_graph_id) - .or_default() - .push(*actor_id); - } - } - } - - for (graph_id, actors_to_clean) in graph_actors_to_clean { - let graph_state = self.graph_states.get_mut(&graph_id).expect("should exist"); - if graph_state.remove_inflight_actors(actors_to_clean) { - self.graph_states.remove(&graph_id); } } } @@ -603,9 +456,6 @@ impl ManagedBarrierState { for (partial_graph_id, graph_state) in &mut self.graph_states { if let Poll::Ready(epoch) = graph_state.poll_next_completed_epoch(cx) { let partial_graph_id = *partial_graph_id; - if graph_state.is_empty() { - self.graph_states.remove(&partial_graph_id); - } return Poll::Ready((partial_graph_id, epoch)); } } @@ -614,62 +464,23 @@ impl ManagedBarrierState { } pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { - let (prev_partial_graph_id, move_to_partial_graph_id) = self + let (prev_partial_graph_id, is_finished) = self .actor_states .get_mut(&actor_id) .expect("should exist") .collect(epoch); + if is_finished { + self.actor_states.remove(&actor_id); + } let prev_graph_state = self .graph_states .get_mut(&prev_partial_graph_id) .expect("should exist"); prev_graph_state.collect(actor_id, epoch); - if let Some(move_to_partial_graph_id) = move_to_partial_graph_id { - if prev_graph_state.remove_inflight_actors([actor_id]) { - self.graph_states.remove(&prev_partial_graph_id); - } - if let Some((move_to_partial_graph_id, start_epoch)) = move_to_partial_graph_id { - self.graph_states - .get_mut(&move_to_partial_graph_id) - .expect("should exist") - .add_inflight_actor(actor_id, start_epoch); - } else { - self.actor_states.remove(&actor_id); - } - } } } impl PartialGraphManagedBarrierState { - fn is_empty(&self) -> bool { - self.inflight_actors.is_empty() - && self.epoch_barrier_state_map.is_empty() - && self.await_epoch_completed_futures.is_empty() - } - - fn add_inflight_actor(&mut self, actor_id: ActorId, start_epoch: u64) { - assert!(self.inflight_actors.insert(actor_id)); - must_match!(&self.epoch_barrier_state_map.get(&start_epoch).expect("should exist").inner, ManagedBarrierStateInner::Issued(state) => { - state.remaining_actors.contains(&actor_id); - }); - if cfg!(debug_assertions) { - for (_, state) in self.epoch_barrier_state_map.range(..start_epoch) { - if let ManagedBarrierStateInner::Issued(state) = &state.inner { - // ensure that start_epoch is the first epoch to collect the barrier - assert!(!state.remaining_actors.contains(&actor_id)); - } - } - } - } - - #[must_use] - fn remove_inflight_actors(&mut self, actor_ids: impl IntoIterator) -> bool { - for actor_id in actor_ids { - assert!(self.inflight_actors.remove(&actor_id)); - } - self.is_empty() - } - /// This method is called when barrier state is modified in either `Issued` or `Stashed` /// to transform the state to `AllCollected` and start state store `sync` when the barrier /// has been collected from all actors for an `Issued` barrier. diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 7a51f1de0d729..bf082fd8121f2 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -49,8 +49,7 @@ impl LocalBarrierWorker { state: BackfillState, ) { if let Some(actor_state) = self.state.actor_states.get(&actor) - && let Some(inflight_barriers) = actor_state.inflight_barriers() - && let Some((partial_graph_id, _)) = inflight_barriers.get(&epoch.prev) + && let Some((partial_graph_id, _)) = actor_state.inflight_barriers.get(&epoch.prev) && let Some(graph_state) = self.state.graph_states.get_mut(partial_graph_id) { graph_state