diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 29093dc607406..d1468325169b1 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -785,7 +785,8 @@ impl LocalBarrierWorker { let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one if let Some(actor_state) = self.state.actor_states.get(&actor_id) - && !actor_state.inflight_barriers.is_empty() + && let Some(inflight_barriers) = actor_state.inflight_barriers() + && !inflight_barriers.is_empty() { self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 8e9734ccbf4ae..a04cf3378366f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -42,6 +42,7 @@ use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; +use crate::task::barrier_manager::managed_state::actor_status::InflightActorState; use crate::task::{await_tree_key, ActorId, PartialGraphId}; struct IssuedState { @@ -192,13 +193,239 @@ impl Display for &'_ PartialGraphManagedBarrierState { } } -#[derive(Default)] -pub(super) struct InflightActorState { - pending_subscribers: BTreeMap>>, - started_subscribers: Vec>, - // epoch -> partial graph id - pub(super) inflight_barriers: BTreeMap, - is_stopped: bool, +mod actor_status { + use std::collections::BTreeMap; + use std::sync::Arc; + + use risingwave_common::must_match; + use risingwave_common::util::epoch::EpochPair; + use tokio::sync::mpsc; + + use crate::executor::{Barrier, Mutation}; + use crate::task::{PartialGraphId, SubscribeMutationItem}; + + enum InflightActorStatus { + /// The actor has not been issued any barrier yet + NotStarted, + /// The actor has been issued some barriers, but has collected all the barrier. + /// Waiting for new barrier to issue. + Pending { + /// The latest `partial_graph_id` before entering `Pending` status. + /// The actor should be in the `inflight_actors` of the graph. + prev_partial_graph_id: PartialGraphId, + /// The `prev_epoch` of the previous barrier + prev_epoch: u64, + }, + /// The actor has been issued with some barriers, and waiting for collecting some barriers. + Running { + /// `prev_epoch` -> partial graph id + /// Store the barriers that has been issued but not collected. + /// Must be non-empty when in this variant, or transit to `Pending`, or the states gets removed when stopped. + /// + /// The actor should be in the `inflight_actors` of graph whose `partial_graph_id` of the first graph id. + inflight_barriers: BTreeMap>)>, + /// Whether the actor has been issued a stop barrier + is_stopping: bool, + }, + } + + pub(crate) struct InflightActorState { + pending_subscribers: BTreeMap>>, + started_subscribers: Vec>, + status: InflightActorStatus, + } + + impl InflightActorState { + pub(super) fn not_started() -> Self { + Self { + pending_subscribers: Default::default(), + started_subscribers: vec![], + status: InflightActorStatus::NotStarted, + } + } + + #[expect(clippy::type_complexity)] + pub(crate) fn inflight_barriers( + &self, + ) -> Option<&BTreeMap>)>> { + if let InflightActorStatus::Running { + inflight_barriers, .. + } = &self.status + { + Some(inflight_barriers) + } else { + None + } + } + + pub(super) fn subscribe_actor_mutation( + &mut self, + start_prev_epoch: u64, + tx: mpsc::UnboundedSender, + ) { + match &self.status { + InflightActorStatus::NotStarted => { + self.pending_subscribers + .entry(start_prev_epoch) + .or_default() + .push(tx); + } + InflightActorStatus::Pending { prev_epoch, .. } => { + assert!(*prev_epoch < start_prev_epoch); + self.pending_subscribers + .entry(start_prev_epoch) + .or_default() + .push(tx); + } + InflightActorStatus::Running { + inflight_barriers, + is_stopping, + .. + } => { + if inflight_barriers.contains_key(&start_prev_epoch) { + for (prev_epoch, (_, mutation)) in + inflight_barriers.range(start_prev_epoch..) + { + if tx.send((*prev_epoch, mutation.clone())).is_err() { + // No more subscribe on the mutation. Simply return. + return; + } + } + if !*is_stopping { + self.started_subscribers.push(tx); + } + } else { + // Barrier has not issued yet. Store the pending tx + if let Some((last_epoch, _)) = inflight_barriers.last_key_value() { + assert!( + *last_epoch < start_prev_epoch, + "later barrier {} has been issued, but skip the start epoch {:?}", + last_epoch, + start_prev_epoch + ); + } + self.pending_subscribers + .entry(start_prev_epoch) + .or_default() + .push(tx); + } + } + } + } + + #[must_use] + pub(super) fn issue_barrier( + &mut self, + partial_graph_id: PartialGraphId, + barrier: &Barrier, + is_stop: bool, + ) -> ( + // Some(prev_partial_graph_id) when the actor was in status + // InflightActorStatus::Pending { .. } with a different graph id + Option, + // whether the actor is new to the `partial_graph_id` + bool, + ) { + let (prev_partial_graph_id, is_new_in_graph) = match &self.status { + InflightActorStatus::NotStarted => { + self.status = InflightActorStatus::Running { + inflight_barriers: Default::default(), + is_stopping: false, + }; + (None, true) + } + InflightActorStatus::Pending { + prev_partial_graph_id, + prev_epoch, + } => { + assert!(*prev_epoch < barrier.epoch.prev); + let prev_partial_graph_id = *prev_partial_graph_id; + self.status = InflightActorStatus::Running { + inflight_barriers: Default::default(), + is_stopping: false, + }; + if prev_partial_graph_id != partial_graph_id { + (Some(prev_partial_graph_id), true) + } else { + (None, false) + } + } + InflightActorStatus::Running { + inflight_barriers, .. + } => { + let (prev_epoch, (prev_partial_graph_id, _)) = + inflight_barriers.last_key_value().expect("non-empty"); + assert!(*prev_epoch < barrier.epoch.prev); + (None, *prev_partial_graph_id != partial_graph_id) + } + }; + + if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { + assert!( + *first_epoch >= barrier.epoch.prev, + "barrier epoch {:?} skip subscribed epoch {}", + barrier.epoch, + first_epoch + ); + if *first_epoch == barrier.epoch.prev { + self.started_subscribers.extend( + self.pending_subscribers + .pop_first() + .expect("should exist") + .1, + ); + } + } + self.started_subscribers.retain(|tx| { + tx.send((barrier.epoch.prev, barrier.mutation.clone())) + .is_ok() + }); + + must_match!(&mut self.status, InflightActorStatus::Running { + inflight_barriers, is_stopping, + } => { + inflight_barriers.insert(barrier.epoch.prev, (partial_graph_id, barrier.mutation.clone())); + *is_stopping = is_stop; + }); + + (prev_partial_graph_id, is_new_in_graph) + } + + #[must_use] + pub(super) fn collect( + &mut self, + epoch: EpochPair, + ) -> ( + // The `partial_graph_id` of actor on the collected epoch + PartialGraphId, + // None => the partial graph id of this actor is not changed + // Some(None) => the actor has stopped, and should be removed from the return `partial_graph_id` + // Some(Some(new_partial_graph_id)) => the actor will move to the `new_partial_graph_id` + Option>, + ) { + must_match!(&mut self.status, InflightActorStatus::Running { + inflight_barriers, is_stopping + } => { + let (prev_epoch, (prev_partial_graph_id, _)) = inflight_barriers.pop_first().expect("should exist"); + assert_eq!(prev_epoch, epoch.prev); + let move_to_graph_id = if let Some((epoch, (graph_id, _))) = inflight_barriers.first_key_value() { + if *graph_id != prev_partial_graph_id { + Some(Some((prev_partial_graph_id, *epoch))) + } else { + None + } + } else if *is_stopping { + Some(None) + } else { + self.status = InflightActorStatus::Pending {prev_epoch, prev_partial_graph_id}; + // No need to move to any partial graph when transit to `Pending`. When issuing the next barrier and + // the next graph id gets different, the actor will then move to the next graph id + None + }; + (prev_partial_graph_id, move_to_graph_id) + }) + } + } } pub(super) struct PartialGraphManagedBarrierState { @@ -210,6 +437,8 @@ pub(super) struct PartialGraphManagedBarrierState { /// The key is `prev_epoch`, and the first value is `curr_epoch` epoch_barrier_state_map: BTreeMap, + inflight_actors: HashSet, + prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. @@ -236,6 +465,7 @@ impl PartialGraphManagedBarrierState { Self { need_seal_epoch, epoch_barrier_state_map: Default::default(), + inflight_actors: Default::default(), prev_barrier_table_ids: None, create_mview_progress: Default::default(), await_epoch_completed_futures: Default::default(), @@ -297,78 +527,10 @@ impl ManagedBarrierState { start_prev_epoch: u64, tx: mpsc::UnboundedSender, ) { - let actor_state = self.actor_states.entry(actor_id).or_default(); - if let Some(start_partial_graph_id) = actor_state.inflight_barriers.get(&start_prev_epoch) { - let start_graph_state = self - .graph_states - .get(start_partial_graph_id) - .expect("should exist") - .epoch_barrier_state_map - .get(&start_prev_epoch) - .expect("should exist"); - match &start_graph_state.inner { - ManagedBarrierStateInner::Issued(issued_state) => { - assert!(issued_state.remaining_actors.contains(&actor_id)); - for (prev_epoch, partial_graph_id) in - actor_state.inflight_barriers.range(start_prev_epoch..) - { - let graph_state = self - .graph_states - .get(partial_graph_id) - .expect("should exist") - .epoch_barrier_state_map - .get(prev_epoch) - .expect("should exist"); - match &graph_state.inner { - ManagedBarrierStateInner::Issued(issued_state) => { - if issued_state.remaining_actors.contains(&actor_id) { - if tx - .send((*prev_epoch, issued_state.mutation.clone())) - .is_err() - { - // No more subscribe on the mutation. Simply return. - return; - } - } else { - // The barrier no more collect from such actor. End subscribe on mutation. - return; - } - } - state @ ManagedBarrierStateInner::AllCollected - | state @ ManagedBarrierStateInner::Completed(_) => { - unreachable!( - "should be Issued when having new subscriber, but current state: {:?}", - state - ) - } - } - } - actor_state.started_subscribers.push(tx); - } - state @ ManagedBarrierStateInner::AllCollected - | state @ ManagedBarrierStateInner::Completed(_) => { - unreachable!( - "should be Issued when having new subscriber, but current state: {:?}", - state - ) - } - } - } else { - // Barrier has not issued yet. Store the pending tx - if let Some((last_epoch, _)) = actor_state.inflight_barriers.last_key_value() { - assert!( - *last_epoch < start_prev_epoch, - "later barrier {} has been issued, but skip the start epoch {:?}", - last_epoch, - start_prev_epoch - ); - } - actor_state - .pending_subscribers - .entry(start_prev_epoch) - .or_default() - .push(tx); - } + self.actor_states + .entry(actor_id) + .or_insert_with(InflightActorState::not_started) + .subscribe_actor_mutation(start_prev_epoch, tx); } pub(super) fn transform_to_issued( @@ -377,43 +539,9 @@ impl ManagedBarrierState { graph_infos: &HashMap, ) { let actor_to_stop = barrier.all_stop_actors(); + let mut graph_actors_to_clean: HashMap> = HashMap::new(); for (partial_graph_id, graph_info) in graph_infos { let partial_graph_id = PartialGraphId::new(*partial_graph_id); - for actor_id in &graph_info.actor_ids_to_collect { - let actor_state = self.actor_states.entry(*actor_id).or_default(); - if let Some((first_epoch, _)) = actor_state.pending_subscribers.first_key_value() { - assert!( - *first_epoch >= barrier.epoch.prev, - "barrier epoch {:?} skip subscribed epoch {}", - barrier.epoch, - first_epoch - ); - if *first_epoch == barrier.epoch.prev { - actor_state.started_subscribers.extend( - actor_state - .pending_subscribers - .pop_first() - .expect("should exist") - .1, - ); - } - } - actor_state.started_subscribers.retain(|tx| { - tx.send((barrier.epoch.prev, barrier.mutation.clone())) - .is_ok() - }); - if let Some((prev_epoch, _)) = actor_state.inflight_barriers.first_key_value() { - assert!(*prev_epoch < barrier.epoch.prev); - } - actor_state - .inflight_barriers - .insert(barrier.epoch.prev, partial_graph_id); - if let Some(actor_to_stop) = actor_to_stop - && actor_to_stop.contains(actor_id) - { - actor_state.is_stopped = true; - } - } let graph_state = self .graph_states @@ -426,6 +554,7 @@ impl ManagedBarrierState { self.barrier_await_tree_reg.clone(), ) }); + graph_state.transform_to_issued( barrier, graph_info.actor_ids_to_collect.iter().cloned().collect(), @@ -435,6 +564,36 @@ impl ManagedBarrierState { .map(|table_id| TableId::new(*table_id)) .collect(), ); + + for actor_id in &graph_info.actor_ids_to_collect { + let (prev_partial_graph_id, is_new_in_graph) = self + .actor_states + .entry(*actor_id) + .or_insert_with(InflightActorState::not_started) + .issue_barrier( + partial_graph_id, + barrier, + actor_to_stop + .map(|actors| actors.contains(actor_id)) + .unwrap_or(false), + ); + if is_new_in_graph { + graph_state.add_inflight_actor(*actor_id, barrier.epoch.prev); + } + if let Some(prev_partial_graph_id) = prev_partial_graph_id { + graph_actors_to_clean + .entry(prev_partial_graph_id) + .or_default() + .push(*actor_id); + } + } + } + + for (graph_id, actors_to_clean) in graph_actors_to_clean { + let graph_state = self.graph_states.get_mut(&graph_id).expect("should exist"); + if graph_state.remove_inflight_actors(actors_to_clean) { + self.graph_states.remove(&graph_id); + } } } @@ -443,9 +602,12 @@ impl ManagedBarrierState { ) -> impl Future + '_ { poll_fn(|cx| { for (partial_graph_id, graph_state) in &mut self.graph_states { - let poll = graph_state.poll_next_completed_epoch(cx); - if poll.is_ready() { - return poll.map(|epoch| (*partial_graph_id, epoch)); + if let Poll::Ready(epoch) = graph_state.poll_next_completed_epoch(cx) { + let partial_graph_id = *partial_graph_id; + if graph_state.is_empty() { + self.graph_states.remove(&partial_graph_id); + } + return Poll::Ready((partial_graph_id, epoch)); } } Poll::Pending @@ -453,23 +615,62 @@ impl ManagedBarrierState { } pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { - let actor_states = self.actor_states.get_mut(&actor_id).expect("should exist"); - let (prev_epoch, partial_graph_id) = actor_states - .inflight_barriers - .pop_first() - .expect("should not be empty"); - assert_eq!(prev_epoch, epoch.prev); - if actor_states.is_stopped && actor_states.inflight_barriers.is_empty() { - self.actor_states.remove(&actor_id); - } - self.graph_states - .get_mut(&partial_graph_id) + let (prev_partial_graph_id, move_to_partial_graph_id) = self + .actor_states + .get_mut(&actor_id) .expect("should exist") - .collect(actor_id, epoch); + .collect(epoch); + let prev_graph_state = self + .graph_states + .get_mut(&prev_partial_graph_id) + .expect("should exist"); + prev_graph_state.collect(actor_id, epoch); + if let Some(move_to_partial_graph_id) = move_to_partial_graph_id { + if prev_graph_state.remove_inflight_actors([actor_id]) { + self.graph_states.remove(&prev_partial_graph_id); + } + if let Some((move_to_partial_graph_id, start_epoch)) = move_to_partial_graph_id { + self.graph_states + .get_mut(&move_to_partial_graph_id) + .expect("should exist") + .add_inflight_actor(actor_id, start_epoch); + } else { + self.actor_states.remove(&actor_id); + } + } } } impl PartialGraphManagedBarrierState { + fn is_empty(&self) -> bool { + self.inflight_actors.is_empty() + && self.epoch_barrier_state_map.is_empty() + && self.await_epoch_completed_futures.is_empty() + } + + fn add_inflight_actor(&mut self, actor_id: ActorId, start_epoch: u64) { + assert!(self.inflight_actors.insert(actor_id)); + must_match!(&self.epoch_barrier_state_map.get(&start_epoch).expect("should exist").inner, ManagedBarrierStateInner::Issued(state) => { + state.remaining_actors.contains(&actor_id); + }); + if cfg!(debug_assertions) { + for (_, state) in self.epoch_barrier_state_map.range(..start_epoch) { + if let ManagedBarrierStateInner::Issued(state) = &state.inner { + // ensure that start_epoch is the first epoch to collect the barrier + assert!(!state.remaining_actors.contains(&actor_id)); + } + } + } + } + + #[must_use] + fn remove_inflight_actors(&mut self, actor_ids: impl IntoIterator) -> bool { + for actor_id in actor_ids { + assert!(self.inflight_actors.remove(&actor_id)); + } + self.is_empty() + } + /// This method is called when barrier state is modified in either `Issued` or `Stashed` /// to transform the state to `AllCollected` and start state store `sync` when the barrier /// has been collected from all actors for an `Issued` barrier. diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 4fe6e98d4cf91..7a51f1de0d729 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -49,7 +49,8 @@ impl LocalBarrierWorker { state: BackfillState, ) { if let Some(actor_state) = self.state.actor_states.get(&actor) - && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) + && let Some(inflight_barriers) = actor_state.inflight_barriers() + && let Some((partial_graph_id, _)) = inflight_barriers.get(&epoch.prev) && let Some(graph_state) = self.state.graph_states.get_mut(partial_graph_id) { graph_state