From a053b523437f4991732633b1562f8050e51e3c66 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 16 Jan 2024 20:13:14 +0800 Subject: [PATCH] refactor(meta): maintain snapshot of running actors instead of resolving it every time for barrier (#14517) --- src/meta/src/barrier/command.rs | 679 +++++++++--------- src/meta/src/barrier/info.rs | 134 +++- src/meta/src/barrier/mod.rs | 241 +------ src/meta/src/barrier/recovery.rs | 86 +-- src/meta/src/barrier/schedule.rs | 27 +- .../{model/barrier.rs => barrier/state.rs} | 30 +- src/meta/src/controller/fragment.rs | 25 +- src/meta/src/manager/catalog/fragment.rs | 22 +- src/meta/src/model/mod.rs | 2 - src/meta/src/model/stream.rs | 21 +- src/meta/src/stream/scale.rs | 26 +- src/meta/src/stream/stream_manager.rs | 20 +- 12 files changed, 636 insertions(+), 677 deletions(-) rename src/meta/src/{model/barrier.rs => barrier/state.rs} (64%) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0072f81ef1a8e..ae75453d8015a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; +use std::default::Default; use std::sync::Arc; use futures::future::try_join_all; +use itertools::Itertools; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; @@ -35,9 +37,9 @@ use risingwave_pb::stream_plan::{ use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use uuid::Uuid; -use super::info::BarrierActorInfo; +use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; use super::trace::TracedEpoch; -use crate::barrier::{CommandChanges, GlobalBarrierManagerContext}; +use crate::barrier::GlobalBarrierManagerContext; use crate::manager::{DdlType, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -48,7 +50,8 @@ use crate::MetaResult; #[derive(Debug, Clone)] pub struct Reschedule { /// Added actors in this fragment. - pub added_actors: Vec, + pub added_actors: HashMap>, + /// Removed actors in this fragment. pub removed_actors: Vec, @@ -68,6 +71,10 @@ pub struct Reschedule { /// Reassigned splits for source actors pub actor_splits: HashMap>, + + /// Whether this fragment is injectable. The injectable means whether the fragment contains + /// any executors that are able to receive barrier. + pub injectable: bool, } #[derive(Debug, Clone)] @@ -79,6 +86,31 @@ pub struct ReplaceTablePlan { pub init_split_assignment: SplitAssignment, } +impl ReplaceTablePlan { + fn actor_changes(&self) -> CommandActorChanges { + let worker_actors = self.new_table_fragments.worker_actor_ids(); + let barrier_inject_actors: &HashSet<_> = &self + .new_table_fragments + .barrier_inject_actor_ids() + .into_iter() + .collect(); + let to_add = worker_actors + .into_iter() + .flat_map(|(node_id, actors)| { + actors.into_iter().map(move |actor_id| ActorDesc { + id: actor_id, + node_id, + is_injectable: barrier_inject_actors.contains(&actor_id), + }) + }) + .collect_vec(); + CommandActorChanges { + to_add, + to_remove: self.old_table_fragments.actor_ids().into_iter().collect(), + } + } +} + /// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands, /// it will build different barriers to send, and may do different stuffs after the barrier is /// collected. @@ -99,24 +131,14 @@ pub enum Command { /// will be generated. Resume(PausedReason), - /// `DropStreamingJobs` command generates a `Stop` barrier by the given - /// [`HashSet`]. The catalog has ensured that these streaming jobs are safe to be + /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given + /// [`HashMap>`]. The catalog has ensured that these streaming jobs are safe to be /// dropped by reference counts before. /// /// Barriers from the actors to be dropped will STILL be collected. /// After the barrier is collected, it notifies the local stream manager of compute nodes to /// drop actors, and then delete the table fragments info from meta store. - /// The TableIds here are the ids for the stream job. - /// It does not include internal table ids. - DropStreamingJobs(HashSet), - - /// `DropStreamingJobsV2` command generates a `Stop` barrier by the given actor info. - /// This is used by new SQL metastore and catalog has already been dropped. - /// - /// Barriers from the actors to be dropped will STILL be collected. - /// After the barrier is collected, it notifies the local stream manager of compute nodes to - /// drop actors. - DropStreamingJobsV2(HashMap>>), + DropStreamingJobs(HashMap>), /// `CreateStreamingJob` command generates a `Add` barrier by given info. /// @@ -182,63 +204,75 @@ impl Command { Self::Resume(reason) } - /// Changes to the actors to be sent or collected after this command is committed. - pub fn changes(&self) -> CommandChanges { + pub fn actor_changes(&self) -> Option { match self { - Command::Plain(_) => CommandChanges::None, - Command::Pause(_) => CommandChanges::None, - Command::Resume(_) => CommandChanges::None, + Command::Plain(_) => None, + Command::Pause(_) => None, + Command::Resume(_) => None, + Command::DropStreamingJobs(node_actors) => Some(CommandActorChanges { + to_add: Default::default(), + to_remove: node_actors.values().flatten().cloned().collect(), + }), Command::CreateStreamingJob { table_fragments, - replace_table: - Some(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, - .. - }), + replace_table, .. } => { - let to_add = new_table_fragments.actor_ids().into_iter().collect(); - let to_remove = old_table_fragments.actor_ids().into_iter().collect(); - - CommandChanges::CreateSinkIntoTable { - sink_id: table_fragments.table_id(), - to_add, - to_remove, + let worker_actors = table_fragments.worker_actor_ids(); + let barrier_inject_actors: &HashSet<_> = &table_fragments + .barrier_inject_actor_ids() + .into_iter() + .collect(); + let mut to_add = worker_actors + .into_iter() + .flat_map(|(node_id, actors)| { + actors.into_iter().map(move |actor_id| ActorDesc { + id: actor_id, + node_id, + is_injectable: barrier_inject_actors.contains(&actor_id), + }) + }) + .collect_vec(); + + if let Some(plan) = replace_table { + let CommandActorChanges { + to_add: to_add_plan, + to_remove, + } = plan.actor_changes(); + to_add.extend(to_add_plan); + Some(CommandActorChanges { to_add, to_remove }) + } else { + Some(CommandActorChanges { + to_add, + to_remove: Default::default(), + }) } } - Command::CreateStreamingJob { - table_fragments, .. - } => CommandChanges::CreateTable(table_fragments.table_id()), - Command::DropStreamingJobs(table_ids) => CommandChanges::DropTables(table_ids.clone()), - Command::DropStreamingJobsV2(job_info) => { - CommandChanges::DropTables(job_info.keys().cloned().collect()) - } - Command::CancelStreamingJob(table_fragments) => { - CommandChanges::DropTables(std::iter::once(table_fragments.table_id()).collect()) - } + Command::CancelStreamingJob(table_fragments) => Some(CommandActorChanges { + to_add: Default::default(), + to_remove: table_fragments.actor_ids().into_iter().collect(), + }), Command::RescheduleFragment { reschedules, .. } => { - let to_add = reschedules - .values() - .flat_map(|r| r.added_actors.iter().copied()) - .collect(); - let to_remove = reschedules - .values() - .flat_map(|r| r.removed_actors.iter().copied()) - .collect(); - CommandChanges::Actor { to_add, to_remove } - } - Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, - .. - }) => { - let to_add = new_table_fragments.actor_ids().into_iter().collect(); - let to_remove = old_table_fragments.actor_ids().into_iter().collect(); - CommandChanges::Actor { to_add, to_remove } + let mut to_add = vec![]; + let mut to_remove = HashSet::new(); + for reschedule in reschedules.values() { + for (node_id, added_actors) in &reschedule.added_actors { + for actor_id in added_actors { + to_add.push(ActorDesc { + id: *actor_id, + node_id: *node_id, + is_injectable: reschedule.injectable, + }); + } + } + to_remove.extend(reschedule.removed_actors.iter().copied()); + } + + Some(CommandActorChanges { to_add, to_remove }) } - Command::SourceSplitAssignment(_) => CommandChanges::None, - Command::Throttle(_) => CommandChanges::None, + Command::ReplaceTable(plan) => Some(plan.actor_changes()), + Command::SourceSplitAssignment(_) => None, + Command::Throttle(_) => None, } } @@ -262,8 +296,7 @@ impl Command { /// [`Command`]. pub struct CommandContext { /// Resolved info in this barrier loop. - // TODO: this could be stale when we are calling `post_collect`, check if it matters - pub info: Arc, + pub info: Arc, pub prev_epoch: TracedEpoch, pub curr_epoch: TracedEpoch, @@ -287,7 +320,7 @@ pub struct CommandContext { impl CommandContext { #[allow(clippy::too_many_arguments)] pub(super) fn new( - info: BarrierActorInfo, + info: InflightActorInfo, prev_epoch: TracedEpoch, curr_epoch: TracedEpoch, current_paused_reason: Option, @@ -316,284 +349,278 @@ impl CommandContext { impl CommandContext { /// Generate a mutation for the given command. pub async fn to_mutation(&self) -> MetaResult> { - let mutation = match &self.command { - Command::Plain(mutation) => mutation.clone(), - - Command::Pause(_) => { - // Only pause when the cluster is not already paused. - if self.current_paused_reason.is_none() { - Some(Mutation::Pause(PauseMutation {})) - } else { - None + let mutation = + match &self.command { + Command::Plain(mutation) => mutation.clone(), + + Command::Pause(_) => { + // Only pause when the cluster is not already paused. + if self.current_paused_reason.is_none() { + Some(Mutation::Pause(PauseMutation {})) + } else { + None + } } - } - Command::Resume(reason) => { - // Only resume when the cluster is paused with the same reason. - if self.current_paused_reason == Some(*reason) { - Some(Mutation::Resume(ResumeMutation {})) - } else { - None + Command::Resume(reason) => { + // Only resume when the cluster is paused with the same reason. + if self.current_paused_reason == Some(*reason) { + Some(Mutation::Resume(ResumeMutation {})) + } else { + None + } } - } - Command::SourceSplitAssignment(change) => { - let mut diff = HashMap::new(); - - for actor_splits in change.values() { - diff.extend(actor_splits.clone()); - } + Command::SourceSplitAssignment(change) => { + let mut diff = HashMap::new(); - Some(Mutation::Splits(SourceChangeSplitMutation { - actor_splits: build_actor_connector_splits(&diff), - })) - } + for actor_splits in change.values() { + diff.extend(actor_splits.clone()); + } - Command::Throttle(config) => { - let mut actor_to_apply = HashMap::new(); - for per_fragment in config.values() { - actor_to_apply.extend( - per_fragment - .iter() - .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })), - ); + Some(Mutation::Splits(SourceChangeSplitMutation { + actor_splits: build_actor_connector_splits(&diff), + })) } - Some(Mutation::Throttle(ThrottleMutation { - actor_throttle: actor_to_apply, - })) - } - - Command::DropStreamingJobs(table_ids) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unreachable!("only available in v1"); - }; + Command::Throttle(config) => { + let mut actor_to_apply = HashMap::new(); + for per_fragment in config.values() { + actor_to_apply.extend(per_fragment.iter().map(|(actor_id, limit)| { + (*actor_id, RateLimit { rate_limit: *limit }) + })); + } - let actors = mgr.fragment_manager.get_table_actor_ids(table_ids).await?; - Some(Mutation::Stop(StopMutation { actors })) - } + Some(Mutation::Throttle(ThrottleMutation { + actor_throttle: actor_to_apply, + })) + } - Command::DropStreamingJobsV2(job_info) => { - let actors: Vec = job_info - .values() - .flat_map(|v| v.values()) - .flatten() - .copied() - .collect(); - Some(Mutation::Stop(StopMutation { actors })) - } + Command::DropStreamingJobs(node_actors) => { + let actors = node_actors.values().flatten().copied().collect(); + Some(Mutation::Stop(StopMutation { actors })) + } - Command::CreateStreamingJob { - table_fragments, - dispatchers, - init_split_assignment: split_assignment, - replace_table, - .. - } => { - let actor_dispatchers = dispatchers - .iter() - .map(|(&actor_id, dispatchers)| { - ( - actor_id, - Dispatchers { - dispatchers: dispatchers.clone(), - }, - ) - }) - .collect(); - let added_actors = table_fragments.actor_ids(); - let actor_splits = split_assignment - .values() - .flat_map(build_actor_connector_splits) - .collect(); - let add = Some(Mutation::Add(AddMutation { - actor_dispatchers, - added_actors, - actor_splits, - // If the cluster is already paused, the new actors should be paused too. - pause: self.current_paused_reason.is_some(), - })); - - if let Some(ReplaceTablePlan { - old_table_fragments, - new_table_fragments: _, - merge_updates, + Command::CreateStreamingJob { + table_fragments, dispatchers, - init_split_assignment, - }) = replace_table - { - // TODO: support in v2. - let update = Self::generate_update_mutation_for_replace_table( + init_split_assignment: split_assignment, + replace_table, + .. + } => { + let actor_dispatchers = dispatchers + .iter() + .map(|(&actor_id, dispatchers)| { + ( + actor_id, + Dispatchers { + dispatchers: dispatchers.clone(), + }, + ) + }) + .collect(); + let added_actors = table_fragments.actor_ids(); + let actor_splits = split_assignment + .values() + .flat_map(build_actor_connector_splits) + .collect(); + let add = Some(Mutation::Add(AddMutation { + actor_dispatchers, + added_actors, + actor_splits, + // If the cluster is already paused, the new actors should be paused too. + pause: self.current_paused_reason.is_some(), + })); + + if let Some(ReplaceTablePlan { old_table_fragments, + new_table_fragments: _, merge_updates, dispatchers, init_split_assignment, - ); + }) = replace_table + { + // TODO: support in v2. + let update = Self::generate_update_mutation_for_replace_table( + old_table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + ); - Some(Mutation::Combined(CombinedMutation { - mutations: vec![ - BarrierMutation { mutation: add }, - BarrierMutation { mutation: update }, - ], - })) - } else { - add + Some(Mutation::Combined(CombinedMutation { + mutations: vec![ + BarrierMutation { mutation: add }, + BarrierMutation { mutation: update }, + ], + })) + } else { + add + } } - } - Command::CancelStreamingJob(table_fragments) => { - let actors = table_fragments.actor_ids(); - Some(Mutation::Stop(StopMutation { actors })) - } - - Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - merge_updates, - dispatchers, - init_split_assignment, - .. - }) => Self::generate_update_mutation_for_replace_table( - old_table_fragments, - merge_updates, - dispatchers, - init_split_assignment, - ), + Command::CancelStreamingJob(table_fragments) => { + let actors = table_fragments.actor_ids(); + Some(Mutation::Stop(StopMutation { actors })) + } - Command::RescheduleFragment { reschedules, .. } => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement scale functions in v2"); - }; - let mut dispatcher_update = HashMap::new(); - for reschedule in reschedules.values() { - for &(upstream_fragment_id, dispatcher_id) in - &reschedule.upstream_fragment_dispatcher_ids - { - // Find the actors of the upstream fragment. - let upstream_actor_ids = mgr - .fragment_manager - .get_running_actors_of_fragment(upstream_fragment_id) - .await?; + Command::ReplaceTable(ReplaceTablePlan { + old_table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + .. + }) => Self::generate_update_mutation_for_replace_table( + old_table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + ), + + Command::RescheduleFragment { reschedules, .. } => { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { + unimplemented!("implement scale functions in v2"); + }; + let mut dispatcher_update = HashMap::new(); + for reschedule in reschedules.values() { + for &(upstream_fragment_id, dispatcher_id) in + &reschedule.upstream_fragment_dispatcher_ids + { + // Find the actors of the upstream fragment. + let upstream_actor_ids = mgr + .fragment_manager + .get_running_actors_of_fragment(upstream_fragment_id) + .await?; - // Record updates for all actors. - for actor_id in upstream_actor_ids { - // Index with the dispatcher id to check duplicates. - dispatcher_update - .try_insert( - (actor_id, dispatcher_id), - DispatcherUpdate { - actor_id, - dispatcher_id, - hash_mapping: reschedule - .upstream_dispatcher_mapping - .as_ref() - .map(|m| m.to_protobuf()), - added_downstream_actor_id: reschedule.added_actors.clone(), - removed_downstream_actor_id: reschedule - .removed_actors - .clone(), - }, - ) - .unwrap(); + // Record updates for all actors. + for actor_id in upstream_actor_ids { + // Index with the dispatcher id to check duplicates. + dispatcher_update + .try_insert( + (actor_id, dispatcher_id), + DispatcherUpdate { + actor_id, + dispatcher_id, + hash_mapping: reschedule + .upstream_dispatcher_mapping + .as_ref() + .map(|m| m.to_protobuf()), + added_downstream_actor_id: reschedule + .added_actors + .values() + .flatten() + .cloned() + .collect(), + removed_downstream_actor_id: reschedule + .removed_actors + .clone(), + }, + ) + .unwrap(); + } } } - } - let dispatcher_update = dispatcher_update.into_values().collect(); - - let mut merge_update = HashMap::new(); - for (&fragment_id, reschedule) in reschedules { - for &downstream_fragment_id in &reschedule.downstream_fragment_ids { - // Find the actors of the downstream fragment. - let downstream_actor_ids = mgr - .fragment_manager - .get_running_actors_of_fragment(downstream_fragment_id) - .await?; + let dispatcher_update = dispatcher_update.into_values().collect(); + + let mut merge_update = HashMap::new(); + for (&fragment_id, reschedule) in reschedules { + for &downstream_fragment_id in &reschedule.downstream_fragment_ids { + // Find the actors of the downstream fragment. + let downstream_actor_ids = mgr + .fragment_manager + .get_running_actors_of_fragment(downstream_fragment_id) + .await?; - // Downstream removed actors should be skipped - // Newly created actors of the current fragment will not dispatch Update - // barriers to them - let downstream_removed_actors: HashSet<_> = reschedules - .get(&downstream_fragment_id) - .map(|downstream_reschedule| { - downstream_reschedule - .removed_actors - .iter() - .copied() - .collect() - }) - .unwrap_or_default(); - - // Record updates for all actors. - for actor_id in downstream_actor_ids { - if downstream_removed_actors.contains(&actor_id) { - continue; + // Downstream removed actors should be skipped + // Newly created actors of the current fragment will not dispatch Update + // barriers to them + let downstream_removed_actors: HashSet<_> = reschedules + .get(&downstream_fragment_id) + .map(|downstream_reschedule| { + downstream_reschedule + .removed_actors + .iter() + .copied() + .collect() + }) + .unwrap_or_default(); + + // Record updates for all actors. + for actor_id in downstream_actor_ids { + if downstream_removed_actors.contains(&actor_id) { + continue; + } + + // Index with the fragment id to check duplicates. + merge_update + .try_insert( + (actor_id, fragment_id), + MergeUpdate { + actor_id, + upstream_fragment_id: fragment_id, + new_upstream_fragment_id: None, + added_upstream_actor_id: reschedule + .added_actors + .values() + .flatten() + .cloned() + .collect(), + removed_upstream_actor_id: reschedule + .removed_actors + .clone(), + }, + ) + .unwrap(); } - - // Index with the fragment id to check duplicates. - merge_update - .try_insert( - (actor_id, fragment_id), - MergeUpdate { - actor_id, - upstream_fragment_id: fragment_id, - new_upstream_fragment_id: None, - added_upstream_actor_id: reschedule.added_actors.clone(), - removed_upstream_actor_id: reschedule - .removed_actors - .clone(), - }, - ) + } + } + let merge_update = merge_update.into_values().collect(); + + let mut actor_vnode_bitmap_update = HashMap::new(); + for reschedule in reschedules.values() { + // Record updates for all actors in this fragment. + for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates { + let bitmap = bitmap.to_protobuf(); + actor_vnode_bitmap_update + .try_insert(actor_id, bitmap) .unwrap(); } } - } - let merge_update = merge_update.into_values().collect(); - let mut actor_vnode_bitmap_update = HashMap::new(); - for reschedule in reschedules.values() { - // Record updates for all actors in this fragment. - for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates { - let bitmap = bitmap.to_protobuf(); - actor_vnode_bitmap_update - .try_insert(actor_id, bitmap) - .unwrap(); + let dropped_actors = reschedules + .values() + .flat_map(|r| r.removed_actors.iter().copied()) + .collect(); + + let mut actor_splits = HashMap::new(); + + for reschedule in reschedules.values() { + for (actor_id, splits) in &reschedule.actor_splits { + actor_splits.insert( + *actor_id as ActorId, + ConnectorSplits { + splits: splits.iter().map(ConnectorSplit::from).collect(), + }, + ); + } } - } - - let dropped_actors = reschedules - .values() - .flat_map(|r| r.removed_actors.iter().copied()) - .collect(); - - let mut actor_splits = HashMap::new(); - for reschedule in reschedules.values() { - for (actor_id, splits) in &reschedule.actor_splits { - actor_splits.insert( - *actor_id as ActorId, - ConnectorSplits { - splits: splits.iter().map(ConnectorSplit::from).collect(), - }, - ); - } + // we don't create dispatchers in reschedule scenario + let actor_new_dispatchers = HashMap::new(); + + let mutation = Mutation::Update(UpdateMutation { + dispatcher_update, + merge_update, + actor_vnode_bitmap_update, + dropped_actors, + actor_splits, + actor_new_dispatchers, + }); + tracing::debug!("update mutation: {mutation:#?}"); + Some(mutation) } - - // we don't create dispatchers in reschedule scenario - let actor_new_dispatchers = HashMap::new(); - - let mutation = Mutation::Update(UpdateMutation { - dispatcher_update, - merge_update, - actor_vnode_bitmap_update, - dropped_actors, - actor_splits, - actor_new_dispatchers, - }); - tracing::debug!("update mutation: {mutation:#?}"); - Some(mutation) - } - }; + }; Ok(mutation) } @@ -787,29 +814,9 @@ impl CommandContext { .await; } - Command::DropStreamingJobs(table_ids) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unreachable!("only available in v1"); - }; + Command::DropStreamingJobs(node_actors) => { // Tell compute nodes to drop actors. - let node_actors = mgr.fragment_manager.table_node_actors(table_ids).await?; - self.clean_up(node_actors).await?; - // Drop fragment info in meta store. - mgr.fragment_manager - .drop_table_fragments_vec(table_ids) - .await?; - } - - Command::DropStreamingJobsV2(job_info) => { - let mut node_actors: BTreeMap> = BTreeMap::new(); - for worker_actor_ids in job_info.values() { - for (worker_id, actor_ids) in worker_actor_ids { - node_actors.entry(*worker_id).or_default().extend(actor_ids); - } - } - // Tell compute nodes to drop actors. - self.clean_up(node_actors).await?; + self.clean_up(node_actors.clone()).await?; } Command::CancelStreamingJob(table_fragments) => { diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index cc7e9de698a41..b52ab90955287 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -12,31 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use risingwave_pb::common::WorkerNode; +use risingwave_pb::common::PbWorkerNode; use crate::manager::{ActorInfos, WorkerId}; use crate::model::ActorId; -/// [`BarrierActorInfo`] resolves the actor info read from meta store for +#[derive(Debug, Clone)] +pub struct ActorDesc { + pub id: ActorId, + pub node_id: WorkerId, + pub is_injectable: bool, +} + +#[derive(Debug, Clone)] +pub struct CommandActorChanges { + pub(crate) to_add: Vec, + pub(crate) to_remove: HashSet, +} + +/// [`InflightActorInfo`] resolves the actor info read from meta store for /// [`crate::barrier::GlobalBarrierManager`]. -pub struct BarrierActorInfo { +#[derive(Default, Clone)] +pub struct InflightActorInfo { /// node_id => node - pub node_map: HashMap, + pub node_map: HashMap, /// node_id => actors - pub actor_map: HashMap>, + pub actor_map: HashMap>, /// node_id => barrier inject actors - pub actor_map_to_send: HashMap>, + pub actor_map_to_send: HashMap>, + + /// actor_id => WorkerId + pub actor_location_map: HashMap, } -impl BarrierActorInfo { - // TODO: we may resolve this info as graph updating, instead of doing it every time we want to - // send a barrier +impl InflightActorInfo { + /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. pub fn resolve( - all_nodes: impl IntoIterator, + all_nodes: impl IntoIterator, actor_infos: ActorInfos, ) -> Self { let node_map = all_nodes @@ -44,25 +60,107 @@ impl BarrierActorInfo { .map(|node| (node.id, node)) .collect::>(); + let actor_map = actor_infos + .actor_maps + .into_iter() + .map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::>())) + .collect::>(); + + let actor_map_to_send = actor_infos + .barrier_inject_actor_maps + .into_iter() + .map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::>())) + .collect::>(); + + let actor_location_map = actor_map + .iter() + .flat_map(|(node_id, actor_ids)| actor_ids.iter().map(|actor_id| (*actor_id, *node_id))) + .collect::>(); + Self { node_map, - actor_map: actor_infos.actor_maps, - actor_map_to_send: actor_infos.barrier_inject_actor_maps, + actor_map, + actor_map_to_send, + actor_location_map, } } - // TODO: should only collect from reachable actors, for mv on mv + /// Update worker nodes snapshot. We need to support incremental updates for it in the future. + pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator) { + self.node_map = all_nodes + .into_iter() + .map(|node| (node.id, node)) + .collect::>(); + } + + /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update + /// the info correspondingly. + pub fn pre_apply(&mut self, changes: Option) { + if let Some(CommandActorChanges { to_add, .. }) = changes { + for actor_desc in to_add { + assert!(self.node_map.contains_key(&actor_desc.node_id)); + assert!( + self.actor_map + .entry(actor_desc.node_id) + .or_default() + .insert(actor_desc.id), + "duplicate actor in command changes" + ); + if actor_desc.is_injectable { + assert!( + self.actor_map_to_send + .entry(actor_desc.node_id) + .or_default() + .insert(actor_desc.id), + "duplicate actor in command changes" + ); + } + assert!( + self.actor_location_map + .insert(actor_desc.id, actor_desc.node_id) + .is_none(), + "duplicate actor in command changes" + ); + } + }; + } + + /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should + /// remove that from the snapshot correspondingly. + pub fn post_apply(&mut self, changes: Option) { + if let Some(CommandActorChanges { to_remove, .. }) = changes { + for actor_id in to_remove { + let node_id = self + .actor_location_map + .remove(&actor_id) + .expect("actor not found"); + let actor_ids = self.actor_map.get_mut(&node_id).expect("node not found"); + assert!(actor_ids.remove(&actor_id), "actor not found"); + self.actor_map_to_send + .get_mut(&node_id) + .map(|actor_ids| actor_ids.remove(&actor_id)); + } + self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty()); + self.actor_map_to_send + .retain(|_, actor_ids| !actor_ids.is_empty()); + } + } + + /// Returns actor list to collect in the target worker node. pub fn actor_ids_to_collect(&self, node_id: &WorkerId) -> impl Iterator { self.actor_map .get(node_id) - .map(|actor_ids| actor_ids.clone().into_iter()) - .unwrap_or_else(|| vec![].into_iter()) + .cloned() + .unwrap_or_default() + .into_iter() } + /// Returns actor list to send in the target worker node. pub fn actor_ids_to_send(&self, node_id: &WorkerId) -> impl Iterator { self.actor_map_to_send .get(node_id) - .map(|actor_ids| actor_ids.clone().into_iter()) - .unwrap_or_else(|| vec![].into_iter()) + .cloned() + .unwrap_or_default() + .into_iter() } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index fc84edd77417f..30f1962ba3061 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -30,11 +30,9 @@ use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; -use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::table::TableType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::BarrierCompleteResponse; @@ -44,17 +42,18 @@ use tokio::task::JoinHandle; use tracing::Instrument; use self::command::CommandContext; -use self::info::BarrierActorInfo; use self::notifier::Notifier; use self::progress::TrackingCommand; +use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; use crate::barrier::rpc::BarrierRpcManager; +use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; -use crate::model::{ActorId, BarrierManagerState, TableFragments}; +use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; @@ -66,6 +65,7 @@ mod progress; mod recovery; mod rpc; mod schedule; +mod state; mod trace; pub use self::command::{Command, ReplaceTablePlan, Reschedule}; @@ -129,35 +129,6 @@ struct Scheduled { checkpoint: bool, } -/// Changes to the actors to be sent or collected after this command is committed. -/// -/// Since the checkpoints might be concurrent, the meta store of table fragments is only updated -/// after the command is committed. When resolving the actor info for those commands after this -/// command, this command might be in-flight and the changes are not yet committed, so we need to -/// record these uncommitted changes and assume they will be eventually successful. -/// -/// See also [`CheckpointControl::can_actor_send_or_collect`]. -#[derive(Debug, Clone)] -pub enum CommandChanges { - /// These tables will be dropped. - DropTables(HashSet), - /// This table will be created. - CreateTable(TableId), - /// Some actors will be added or removed. - Actor { - to_add: HashSet, - to_remove: HashSet, - }, - /// This is used for sinking into the table, featuring both `CreateTable` and `Actor` changes. - CreateSinkIntoTable { - sink_id: TableId, - to_add: HashSet, - to_remove: HashSet, - }, - /// No changes. - None, -} - #[derive(Clone)] pub struct GlobalBarrierManagerContext { status: Arc>, @@ -214,19 +185,6 @@ struct CheckpointControl { /// Save the state and message of barrier in order. command_ctx_queue: VecDeque, - // Below for uncommitted changes for the inflight barriers. - /// In addition to the actors with status `Running`. The barrier needs to send or collect the - /// actors of these tables. - creating_tables: HashSet, - /// The barrier does not send or collect the actors of these tables, even if they are - /// `Running`. - dropping_tables: HashSet, - /// In addition to the actors with status `Running`. The barrier needs to send or collect these - /// actors. - adding_actors: HashSet, - /// The barrier does not send or collect these actors, even if they are `Running`. - removing_actors: HashSet, - metrics: Arc, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) @@ -237,10 +195,6 @@ impl CheckpointControl { fn new(metrics: Arc) -> Self { Self { command_ctx_queue: Default::default(), - creating_tables: Default::default(), - dropping_tables: Default::default(), - adding_actors: Default::default(), - removing_actors: Default::default(), metrics, finished_jobs: Default::default(), } @@ -274,7 +228,6 @@ impl CheckpointControl { x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() }) { self.command_ctx_queue.remove(index); - self.remove_changes(cancelled_command.context.command.changes()); } } else { // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. @@ -286,98 +239,6 @@ impl CheckpointControl { .retain(|x| x.table_to_create() != Some(id)); } - /// Before resolving the actors to be sent or collected, we should first record the newly - /// created table and added actors into checkpoint control, so that `can_actor_send_or_collect` - /// will return `true`. - fn pre_resolve(&mut self, command: &Command) { - self.pre_resolve_helper(command.changes()); - } - - fn pre_resolve_helper(&mut self, changes: CommandChanges) { - match changes { - CommandChanges::CreateTable(table) => { - assert!( - !self.dropping_tables.contains(&table), - "conflict table in concurrent checkpoint" - ); - assert!( - self.creating_tables.insert(table), - "duplicated table in concurrent checkpoint" - ); - } - - CommandChanges::Actor { to_add, .. } => { - assert!( - self.adding_actors.is_disjoint(&to_add), - "duplicated actor in concurrent checkpoint" - ); - self.adding_actors.extend(to_add); - } - - CommandChanges::CreateSinkIntoTable { - sink_id, - to_add, - to_remove, - } => { - self.pre_resolve_helper(CommandChanges::CreateTable(sink_id)); - self.pre_resolve_helper(CommandChanges::Actor { to_add, to_remove }); - } - - _ => {} - } - } - - /// After resolving the actors to be sent or collected, we should remove the dropped table and - /// removed actors from checkpoint control, so that `can_actor_send_or_collect` will return - /// `false`. - fn post_resolve(&mut self, command: &Command) { - self.post_resolve_helper(command.changes()); - } - - fn post_resolve_helper(&mut self, change: CommandChanges) { - match change { - CommandChanges::DropTables(tables) => { - assert!( - self.dropping_tables.is_disjoint(&tables), - "duplicated table in concurrent checkpoint" - ); - self.dropping_tables.extend(tables); - } - - CommandChanges::Actor { to_remove, .. } - | CommandChanges::CreateSinkIntoTable { to_remove, .. } => { - assert!( - self.removing_actors.is_disjoint(&to_remove), - "duplicated actor in concurrent checkpoint" - ); - self.removing_actors.extend(to_remove); - } - _ => {} - } - } - - /// Barrier can be sent to and collected from an actor if: - /// 1. The actor is Running and not being dropped or removed in rescheduling. - /// 2. The actor is Inactive and belongs to a creating MV or adding in rescheduling and not - /// belongs to a canceling command. - fn can_actor_send_or_collect( - &self, - s: ActorState, - table_id: TableId, - actor_id: ActorId, - ) -> bool { - let removing = - self.dropping_tables.contains(&table_id) || self.removing_actors.contains(&actor_id); - let adding = - self.creating_tables.contains(&table_id) || self.adding_actors.contains(&actor_id); - - match s { - ActorState::Inactive => adding && !removing, - ActorState::Running => !removing, - ActorState::Unspecified => unreachable!(), - } - } - /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.metrics.in_flight_barrier_nums.set( @@ -431,18 +292,11 @@ impl CheckpointControl { .unwrap_or(self.command_ctx_queue.len()); let complete_nodes = self.command_ctx_queue.drain(..index).collect_vec(); complete_nodes - .iter() - .for_each(|node| self.remove_changes(node.command_ctx.command.changes())); - complete_nodes } /// Remove all nodes from queue and return them. fn barrier_failed(&mut self) -> Vec { - let complete_nodes = self.command_ctx_queue.drain(..).collect_vec(); - complete_nodes - .iter() - .for_each(|node| self.remove_changes(node.command_ctx.command.changes())); - complete_nodes + self.command_ctx_queue.drain(..).collect_vec() } /// Pause inject barrier until True. @@ -477,54 +331,8 @@ impl CheckpointControl { .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) } - /// After some command is committed, the changes will be applied to the meta store so we can - /// remove the changes from checkpoint control. - pub fn remove_changes(&mut self, changes: CommandChanges) { - match changes { - CommandChanges::CreateTable(table_id) => { - assert!(self.creating_tables.remove(&table_id)); - } - CommandChanges::DropTables(table_ids) => { - assert!(self.dropping_tables.is_superset(&table_ids)); - self.dropping_tables.retain(|a| !table_ids.contains(a)); - } - CommandChanges::Actor { to_add, to_remove } => { - assert!(self.adding_actors.is_superset(&to_add)); - assert!(self.removing_actors.is_superset(&to_remove)); - - self.adding_actors.retain(|a| !to_add.contains(a)); - self.removing_actors.retain(|a| !to_remove.contains(a)); - } - CommandChanges::None => {} - CommandChanges::CreateSinkIntoTable { - sink_id, - to_add, - to_remove, - } => { - self.remove_changes(CommandChanges::CreateTable(sink_id)); - self.remove_changes(CommandChanges::Actor { to_add, to_remove }); - } - } - } - /// We need to make sure there are no changes when doing recovery pub fn clear_changes(&mut self) { - if !self.creating_tables.is_empty() { - tracing::warn!("there are some changes in creating_tables"); - self.creating_tables.clear(); - } - if !self.removing_actors.is_empty() { - tracing::warn!("there are some changes in removing_actors"); - self.removing_actors.clear(); - } - if !self.adding_actors.is_empty() { - tracing::warn!("there are some changes in adding_actors"); - self.adding_actors.clear(); - } - if !self.dropping_tables.is_empty() { - tracing::warn!("there are some changes in dropping_tables"); - self.dropping_tables.clear(); - } self.finished_jobs.clear(); } } @@ -575,8 +383,11 @@ impl GlobalBarrierManager { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; - let initial_invalid_state = - BarrierManagerState::new(TracedEpoch::new(Epoch(INVALID_EPOCH)), None); + let initial_invalid_state = BarrierManagerState::new( + TracedEpoch::new(Epoch(INVALID_EPOCH)), + InflightActorInfo::default(), + None, + ); let checkpoint_control = CheckpointControl::new(metrics.clone()); let tracker = CreateMviewProgressTracker::new(); @@ -782,15 +593,15 @@ impl GlobalBarrierManager { checkpoint, span, } = self.scheduled_barriers.pop_or_default().await; - self.checkpoint_control.pre_resolve(&command); - let info = self + + let all_nodes = self .context - .resolve_actor_info(|s: ActorState, table_id: TableId, actor_id: ActorId| { - self.checkpoint_control - .can_actor_send_or_collect(s, table_id, actor_id) - }) - .await; - self.checkpoint_control.post_resolve(&command); + .metadata_manager + .list_active_streaming_compute_nodes() + .await + .unwrap(); + self.state.resolve_worker_nodes(all_nodes); + let info = self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); let kind = if checkpoint { @@ -1102,24 +913,18 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info( - &self, - check_state: impl Fn(ActorState, TableId, ActorId) -> bool, - ) -> BarrierActorInfo { + async fn resolve_actor_info(&self) -> InflightActorInfo { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_nodes = mgr .cluster_manager .list_active_streaming_compute_nodes() .await; - let all_actor_infos = mgr.fragment_manager.load_all_actors(&check_state).await; + let all_actor_infos = mgr.fragment_manager.load_all_actors().await; - BarrierActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(all_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { - let check_state = |s: ActorState, table_id: ObjectId, actor_id: i32| { - check_state(s, TableId::new(table_id as _), actor_id as _) - }; let all_nodes = mgr .cluster_controller .list_active_streaming_workers() @@ -1131,11 +936,11 @@ impl GlobalBarrierManagerContext { .collect(); let all_actor_infos = mgr .catalog_controller - .load_all_actors(&pu_mappings, check_state) + .load_all_actors(&pu_mappings) .await .unwrap(); - BarrierActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(all_nodes, all_actor_infos) } }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index afaf1667b2d59..d9a8b58226456 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -38,14 +38,15 @@ use uuid::Uuid; use super::TracedEpoch; use crate::barrier::command::CommandContext; -use crate::barrier::info::BarrierActorInfo; +use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::schedule::ScheduledBarriers; -use crate::barrier::{CheckpointControl, Command, GlobalBarrierManagerContext}; +use crate::barrier::state::BarrierManagerState; +use crate::barrier::{Command, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; use crate::manager::{MetadataManager, WorkerId}; -use crate::model::{BarrierManagerState, MetadataModel, MigrationPlan, TableFragments}; +use crate::model::{MetadataModel, MigrationPlan, TableFragments}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; @@ -63,14 +64,6 @@ impl GlobalBarrierManagerContext { .map(jitter) } - async fn resolve_actor_info_for_recovery(&self) -> BarrierActorInfo { - let default_checkpoint_control = CheckpointControl::new(self.metrics.clone()); - self.resolve_actor_info(|s, table_id, actor_id| { - default_checkpoint_control.can_actor_send_or_collect(s, table_id, actor_id) - }) - .await - } - /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted. async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { match &self.metadata_manager { @@ -353,25 +346,18 @@ impl GlobalBarrierManagerContext { let state = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { - // This is a quick path to accelerate the process of dropping streaming jobs. Not that - // some table fragments might have been cleaned as dirty, but it's fine since the drop - // interface is idempotent. - if let MetadataManager::V1(mgr) = &self.metadata_manager { - let to_drop_tables = scheduled_barriers.pre_apply_drop_scheduled().await; - mgr.fragment_manager - .drop_table_fragments_vec(&to_drop_tables) - .await?; - } + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. + let _ = scheduled_barriers.pre_apply_drop_scheduled().await; // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. - let info = if self.env.opts.enable_scale_in_when_recovery { - let info = self.resolve_actor_info_for_recovery().await; + let mut info = if self.env.opts.enable_scale_in_when_recovery { + let info = self.resolve_actor_info().await; let scaled = self.scale_actors(&info).await.inspect_err(|err| { warn!(err = ?err, "scale actors failed"); })?; if scaled { - self.resolve_actor_info_for_recovery().await + self.resolve_actor_info().await } else { info } @@ -387,6 +373,10 @@ impl GlobalBarrierManagerContext { warn!(err = ?err, "reset compute nodes failed"); })?; + if scheduled_barriers.pre_apply_drop_scheduled().await { + info = self.resolve_actor_info().await; + } + // update and build all actors. self.update_actors(&info).await.inspect_err(|err| { warn!(err = ?err, "update actors failed"); @@ -410,7 +400,7 @@ impl GlobalBarrierManagerContext { // Inject the `Initial` barrier to initialize all executors. let command_ctx = Arc::new(CommandContext::new( - info, + info.clone(), prev_epoch.clone(), new_epoch.clone(), paused_reason, @@ -447,7 +437,7 @@ impl GlobalBarrierManagerContext { }; let (new_epoch, _) = res?; - BarrierManagerState::new(new_epoch, command_ctx.next_paused_reason()) + BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()) }; if recovery_result.is_err() { self.metrics.recovery_failure_cnt.inc(); @@ -472,14 +462,14 @@ impl GlobalBarrierManagerContext { } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors(&self) -> MetaResult { + async fn migrate_actors(&self) -> MetaResult { match &self.metadata_manager { MetadataManager::V1(_) => self.migrate_actors_v1().await, MetadataManager::V2(_) => self.migrate_actors_v2().await, } } - async fn migrate_actors_v2(&self) -> MetaResult { + async fn migrate_actors_v2(&self) -> MetaResult { let MetadataManager::V2(mgr) = &self.metadata_manager else { unreachable!() }; @@ -504,7 +494,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - let info = self.resolve_actor_info_for_recovery().await; + let info = self.resolve_actor_info().await; return Ok(info); } @@ -556,17 +546,17 @@ impl GlobalBarrierManagerContext { mgr.catalog_controller.migrate_actors(plan).await?; debug!("migrate actors succeed."); - let info = self.resolve_actor_info_for_recovery().await; + let info = self.resolve_actor_info().await; Ok(info) } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors_v1(&self) -> MetaResult { + async fn migrate_actors_v1(&self) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { unreachable!() }; - let info = self.resolve_actor_info_for_recovery().await; + let info = self.resolve_actor_info().await; // 1. get expired workers. let expired_workers: HashSet = info @@ -590,25 +580,25 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store()).await?; debug!("migrate actors succeed."); - let info = self.resolve_actor_info_for_recovery().await; + let info = self.resolve_actor_info().await; Ok(info) } - async fn scale_actors(&self, info: &BarrierActorInfo) -> MetaResult { + async fn scale_actors(&self, info: &InflightActorInfo) -> MetaResult { match &self.metadata_manager { MetadataManager::V1(_) => self.scale_actors_v1(info).await, MetadataManager::V2(_) => self.scale_actors_v2(info), } } - fn scale_actors_v2(&self, _info: &BarrierActorInfo) -> MetaResult { + fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult { let MetadataManager::V2(_mgr) = &self.metadata_manager else { unreachable!() }; unimplemented!("implement auto-scale funcs in sql backend") } - async fn scale_actors_v1(&self, info: &BarrierActorInfo) -> MetaResult { + async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { unreachable!() }; @@ -811,7 +801,7 @@ impl GlobalBarrierManagerContext { } /// Update all actors in compute nodes. - async fn update_actors(&self, info: &BarrierActorInfo) -> MetaResult<()> { + async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to update, skipping."); return Ok(()); @@ -835,10 +825,22 @@ impl GlobalBarrierManagerContext { .flatten_ok() .try_collect()?; - let mut node_actors = self.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; + + // Check if any actors were dropped after info resolved. + if all_node_actors.iter().any(|(node_id, node_actors)| { + !info.node_map.contains_key(node_id) + || info + .actor_map + .get(node_id) + .map(|actors| actors.len() != node_actors.len()) + .unwrap_or(true) + }) { + return Err(anyhow!("actors dropped during update").into()); + } info.actor_map.iter().map(|(node_id, actors)| { - let new_actors = node_actors.remove(node_id).unwrap_or_default(); + let node_actors = all_node_actors.remove(node_id).unwrap_or_default(); let node = info.node_map.get(node_id).unwrap(); let actor_infos = actor_infos.clone(); @@ -855,7 +857,7 @@ impl GlobalBarrierManagerContext { client .update_actors(UpdateActorsRequest { request_id, - actors: new_actors, + actors: node_actors, }) .await?; @@ -867,7 +869,7 @@ impl GlobalBarrierManagerContext { } /// Build all actors in compute nodes. - async fn build_actors(&self, info: &BarrierActorInfo) -> MetaResult<()> { + async fn build_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to build, skipping."); return Ok(()); @@ -876,7 +878,7 @@ impl GlobalBarrierManagerContext { info.actor_map .iter() .map(|(node_id, actors)| async move { - let actors = actors.clone(); + let actors = actors.iter().cloned().collect(); let node = info.node_map.get(node_id).unwrap(); let client = self.env.stream_client_pool().get(node).await?; @@ -899,7 +901,7 @@ impl GlobalBarrierManagerContext { } /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes(&self, info: &BarrierActorInfo) -> MetaResult<()> { + async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { let futures = info.node_map.values().map(|worker_node| async move { let client = self.env.stream_client_pool().get(worker_node).await?; debug!(worker = ?worker_node.id, "force stop actors"); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index ccfd2ea03d08e..aab3234d620cb 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::iter::once; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -395,35 +395,28 @@ impl ScheduledBarriers { queue.mark_ready(); } - /// Try to pre apply drop scheduled command and return the table ids of dropped streaming jobs. + /// Try to pre apply drop scheduled command and return true if any. /// It should only be called in recovery. - pub(super) async fn pre_apply_drop_scheduled(&self) -> HashSet { - let mut to_drop_tables = HashSet::new(); + pub(super) async fn pre_apply_drop_scheduled(&self) -> bool { let mut queue = self.inner.queue.write().await; assert_matches!(queue.status, QueueStatus::Blocked(_)); + let mut found = false; while let Some(Scheduled { notifiers, command, .. }) = queue.queue.pop_front() { - match command { - Command::DropStreamingJobs(table_ids) => { - to_drop_tables.extend(table_ids); - } - Command::CancelStreamingJob(table_fragments) => { - let table_id = table_fragments.table_id(); - to_drop_tables.insert(table_id); - } - _ => { - unreachable!("only drop streaming jobs should be buffered"); - } - } + assert_matches!( + command, + Command::DropStreamingJobs(_) | Command::CancelStreamingJob(_) + ); notifiers.into_iter().for_each(|mut notify| { notify.notify_collected(); notify.notify_finished(); }); + found = true; } - to_drop_tables + found } /// Whether the barrier(checkpoint = true) should be injected. diff --git a/src/meta/src/model/barrier.rs b/src/meta/src/barrier/state.rs similarity index 64% rename from src/meta/src/model/barrier.rs rename to src/meta/src/barrier/state.rs index fb0c66b588730..560f17118c58f 100644 --- a/src/meta/src/model/barrier.rs +++ b/src/meta/src/barrier/state.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::PausedReason; -use crate::barrier::TracedEpoch; +use crate::barrier::info::InflightActorInfo; +use crate::barrier::{Command, TracedEpoch}; /// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`. pub struct BarrierManagerState { @@ -24,14 +26,22 @@ pub struct BarrierManagerState { /// committed snapshot in `HummockManager`. in_flight_prev_epoch: TracedEpoch, + /// Inflight running actors info. + pub(crate) inflight_actor_infos: InflightActorInfo, + /// Whether the cluster is paused and the reason. paused_reason: Option, } impl BarrierManagerState { - pub fn new(in_flight_prev_epoch: TracedEpoch, paused_reason: Option) -> Self { + pub fn new( + in_flight_prev_epoch: TracedEpoch, + inflight_actor_infos: InflightActorInfo, + paused_reason: Option, + ) -> Self { Self { in_flight_prev_epoch, + inflight_actor_infos, paused_reason, } } @@ -58,4 +68,20 @@ impl BarrierManagerState { self.in_flight_prev_epoch = next_epoch.clone(); (prev_epoch, next_epoch) } + + // TODO: optimize it as incremental updates. + pub fn resolve_worker_nodes(&mut self, nodes: Vec) { + self.inflight_actor_infos.resolve_worker_nodes(nodes); + } + + /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors + /// will be removed from the state after the info get resolved. + pub fn apply_command(&mut self, command: &Command) -> InflightActorInfo { + let changes = command.actor_changes(); + self.inflight_actor_infos.pre_apply(changes.clone()); + let info = self.inflight_actor_infos.clone(); + self.inflight_actor_infos.post_apply(changes); + + info + } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 67a81f194b10a..0de826fef9f86 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -814,22 +814,20 @@ impl CatalogController { Ok(fragment_state_tables) } - /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or + /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or /// collected pub async fn load_all_actors( &self, parallel_units_map: &HashMap, - check_state: impl Fn(PbActorState, ObjectId, ActorId) -> bool, ) -> MetaResult { let inner = self.inner.read().await; - let actor_info: Vec<(ActorId, ActorStatus, i32, ObjectId, i32)> = Actor::find() + let actor_info: Vec<(ActorId, i32, i32)> = Actor::find() .select_only() .column(actor::Column::ActorId) - .column(actor::Column::Status) .column(actor::Column::ParallelUnitId) - .column(fragment::Column::JobId) .column(fragment::Column::FragmentTypeMask) .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .filter(actor::Column::Status.eq(ActorStatus::Running)) .into_tuple() .all(&inner.db) .await?; @@ -837,24 +835,21 @@ impl CatalogController { let mut actor_maps = HashMap::new(); let mut barrier_inject_actor_maps = HashMap::new(); - for (actor_id, status, parallel_unit_id, job_id, type_mask) in actor_info { - let status = PbActorState::from(status); + for (actor_id, parallel_unit_id, type_mask) in actor_info { // FIXME: since worker might have gone, it's not safe to unwrap here. let worker_id = parallel_units_map .get(&(parallel_unit_id as _)) .unwrap() .worker_node_id; - if check_state(status, job_id, actor_id) { - actor_maps + actor_maps + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor_id as _); + if Self::is_injectable(type_mask as _) { + barrier_inject_actor_maps .entry(worker_id) .or_insert_with(Vec::new) .push(actor_id as _); - if Self::is_injectable(type_mask as _) { - barrier_inject_actor_maps - .entry(worker_id) - .or_insert_with(Vec::new) - .push(actor_id as _); - } } } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 89ea2de7148a7..5f778c5900039 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -722,12 +722,9 @@ impl FragmentManager { Ok(()) } - /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or + /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or /// collected - pub async fn load_all_actors( - &self, - check_state: impl Fn(ActorState, TableId, ActorId) -> bool, - ) -> ActorInfos { + pub async fn load_all_actors(&self) -> ActorInfos { let mut actor_maps = HashMap::new(); let mut barrier_inject_actor_maps = HashMap::new(); @@ -735,7 +732,7 @@ impl FragmentManager { for fragments in map.values() { for (worker_id, actor_states) in fragments.worker_actor_states() { for (actor_id, actor_state) in actor_states { - if check_state(actor_state, fragments.table_id(), actor_id) { + if actor_state == ActorState::Running { actor_maps .entry(worker_id) .or_insert_with(Vec::new) @@ -747,7 +744,7 @@ impl FragmentManager { let barrier_inject_actors = fragments.worker_barrier_inject_actor_states(); for (worker_id, actor_states) in barrier_inject_actors { for (actor_id, actor_state) in actor_states { - if check_state(actor_state, fragments.table_id(), actor_id) { + if actor_state == ActorState::Running { barrier_inject_actor_maps .entry(worker_id) .or_insert_with(Vec::new) @@ -1111,7 +1108,7 @@ impl FragmentManager { let new_created_actors: HashSet<_> = reschedules .values() - .flat_map(|reschedule| reschedule.added_actors.clone()) + .flat_map(|reschedule| reschedule.added_actors.values().flatten().cloned()) .collect(); let to_update_table_fragments = map @@ -1157,7 +1154,7 @@ impl FragmentManager { } = reschedule; // Add actors to this fragment: set the state to `Running`. - for actor_id in added_actors { + for actor_id in added_actors.values().flatten() { table_fragment .actor_status .get_mut(actor_id) @@ -1240,6 +1237,7 @@ impl FragmentManager { } = reschedule; let removed_actor_ids: HashSet<_> = removed_actors.iter().cloned().collect(); + let added_actor_ids = added_actors.values().flatten().cloned().collect_vec(); // Update the dispatcher of the upstream fragments. for (upstream_fragment_id, dispatcher_id) in upstream_fragment_dispatcher_ids { @@ -1272,7 +1270,7 @@ impl FragmentManager { update_actors( dispatcher.downstream_actor_id.as_mut(), &removed_actor_ids, - added_actors, + &added_actor_ids, ); } } @@ -1301,7 +1299,7 @@ impl FragmentManager { update_actors( downstream_actor.upstream_actor_id.as_mut(), &removed_actor_ids, - added_actors, + &added_actor_ids, ); if let Some(node) = downstream_actor.nodes.as_mut() { @@ -1309,7 +1307,7 @@ impl FragmentManager { node, fragment_id, &removed_actor_ids, - added_actors, + &added_actor_ids, ); } } diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 53b78c671c660..66dff64dc186f 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod barrier; mod catalog; mod cluster; mod error; @@ -28,7 +27,6 @@ use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use async_trait::async_trait; -pub use barrier::*; pub use cluster::*; pub use error::*; pub use migration_plan::*; diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 83c93eb807036..54993d8fee805 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -30,7 +30,7 @@ use risingwave_pb::meta::{PbTableFragments, PbTableParallelism}; use risingwave_pb::plan_common::PbExprContext; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - FragmentTypeFlag, PbStreamContext, StreamActor, StreamNode, StreamSource, + FragmentTypeFlag, PbFragmentTypeFlag, PbStreamContext, StreamActor, StreamNode, StreamSource, }; use super::{ActorId, FragmentId}; @@ -347,14 +347,17 @@ impl TableFragments { /// Returns barrier inject actor ids. pub fn barrier_inject_actor_ids(&self) -> Vec { - Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask - & (FragmentTypeFlag::Source as u32 - | FragmentTypeFlag::Now as u32 - | FragmentTypeFlag::Values as u32 - | FragmentTypeFlag::BarrierRecv as u32)) - != 0 - }) + Self::filter_actor_ids(self, Self::is_injectable) + } + + /// Check if the fragment type mask is injectable. + pub fn is_injectable(fragment_type_mask: u32) -> bool { + (fragment_type_mask + & (PbFragmentTypeFlag::Source as u32 + | PbFragmentTypeFlag::Now as u32 + | PbFragmentTypeFlag::Values as u32 + | PbFragmentTypeFlag::BarrierRecv as u32)) + != 0 } /// Returns mview actor ids. diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index fc0a7ef55b8f1..00a8c18885dcc 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1129,12 +1129,27 @@ impl ScaleController { HashMap::with_capacity(reschedules.len()); for (fragment_id, _) in reschedules { - let actors_to_create = fragment_actors_to_create + let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new(); + let fragment_type_mask = ctx + .fragment_map .get(&fragment_id) - .cloned() - .unwrap_or_default() - .into_keys() - .collect(); + .unwrap() + .fragment_type_mask; + let injectable = TableFragments::is_injectable(fragment_type_mask); + + if let Some(actor_pu_maps) = fragment_actors_to_create.get(&fragment_id).cloned() { + for (actor_id, parallel_unit_id) in actor_pu_maps { + let worker_id = ctx + .parallel_unit_id_to_worker_id + .get(¶llel_unit_id) + .with_context(|| format!("parallel unit {} not found", parallel_unit_id))?; + actors_to_create + .entry(*worker_id) + .or_default() + .push(actor_id); + } + } + let actors_to_remove = fragment_actors_to_remove .get(&fragment_id) .cloned() @@ -1274,6 +1289,7 @@ impl ScaleController { upstream_dispatcher_mapping, downstream_fragment_ids, actor_splits, + injectable, }, ); } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f10738d3f5cab..19daad681506e 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -619,8 +619,26 @@ impl GlobalStreamManager { .drop_source_fragments(&table_fragments_vec) .await; + // Drop table fragments directly. + mgr.fragment_manager + .drop_table_fragments_vec(&table_ids.into_iter().collect()) + .await?; + + // Issues a drop barrier command. + let mut worker_actors = HashMap::new(); + for table_fragments in &table_fragments_vec { + table_fragments + .worker_actor_ids() + .into_iter() + .for_each(|(worker_id, actor_ids)| { + worker_actors + .entry(worker_id) + .or_insert_with(Vec::new) + .extend(actor_ids); + }); + } self.barrier_scheduler - .run_command(Command::DropStreamingJobs(table_ids.into_iter().collect())) + .run_command(Command::DropStreamingJobs(worker_actors)) .await?; // Unregister from compaction group afterwards.