From 6b85b4f666a76e2285c13d6b2893bc462647c3ec Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 18 Feb 2024 17:23:26 +0800 Subject: [PATCH 1/3] refactor(meta): track finished create mv job in tracker --- src/meta/src/barrier/command.rs | 9 ---- src/meta/src/barrier/mod.rs | 80 ++++++++++++-------------------- src/meta/src/barrier/progress.rs | 35 ++++---------- 3 files changed, 38 insertions(+), 86 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6b1b73d6ca697..9e22f5debd0ed 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -709,15 +709,6 @@ impl CommandContext { } } - /// For `CancelStreamingJob`, returns the actors of the `StreamScan` nodes. For other commands, - /// returns an empty set. - pub fn actors_to_cancel(&self) -> HashSet { - match &self.command { - Command::CancelStreamingJob(table_fragments) => table_fragments.backfill_actor_ids(), - _ => Default::default(), - } - } - /// For `CancelStreamingJob`, returns the table id of the target table. pub fn table_to_cancel(&self) -> Option { match &self.command { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3bad3cbd15a2d..7e24f4564df4d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -151,7 +151,7 @@ pub struct GlobalBarrierManagerContext { sink_manager: SinkCoordinatorManager, - metrics: Arc, + pub(super) metrics: Arc, stream_rpc_manager: StreamRpcManager, @@ -195,21 +195,19 @@ struct CheckpointControl { /// Save the state and message of barrier in order. command_ctx_queue: VecDeque, - metrics: Arc, - - /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_jobs: Vec, + context: GlobalBarrierManagerContext, } impl CheckpointControl { - fn new(metrics: Arc) -> Self { + fn new(context: GlobalBarrierManagerContext) -> Self { Self { command_ctx_queue: Default::default(), - metrics, - finished_jobs: Default::default(), + context, } } +} +impl CreateMviewProgressTracker { /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { self.finished_jobs.push(finished_job); @@ -232,39 +230,32 @@ impl CheckpointControl { Ok(!self.finished_jobs.is_empty()) } - fn cancel_command(&mut self, cancelled_job: TrackingJob) { - if let TrackingJob::New(cancelled_command) = cancelled_job { - if let Some(index) = self.command_ctx_queue.iter().position(|x| { - x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() - }) { - self.command_ctx_queue.remove(index); - } - } else { - // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. - } - } - - fn cancel_stashed_command(&mut self, id: TableId) { + fn cancel_command(&mut self, id: TableId) { + let _ = self.progress_map.remove(&id); self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); + self.actor_map.retain(|_, table_id| *table_id != id); } +} +impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { - self.metrics.in_flight_barrier_nums.set( + self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue .iter() .filter(|x| matches!(x.state, InFlight)) .count() as i64, ); - self.metrics + self.context + .metrics .all_barrier_nums .set(self.command_ctx_queue.len() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { - let timer = self.metrics.barrier_latency.start_timer(); + let timer = self.context.metrics.barrier_latency.start_timer(); self.command_ctx_queue.push_back(EpochNode { timer: Some(timer), @@ -284,7 +275,11 @@ impl CheckpointControl { result: Vec, ) -> Vec { // change state to complete, and wait for nodes with the smaller epoch to commit - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + let wait_commit_timer = self + .context + .metrics + .barrier_wait_commit_latency + .start_timer(); if let Some(node) = self .command_ctx_queue .iter_mut() @@ -340,11 +335,6 @@ impl CheckpointControl { .iter() .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) } - - /// We need to make sure there are no changes when doing recovery - pub fn clear_changes(&mut self) { - self.finished_jobs.clear(); - } } /// The state and message of this barrier, a node for concurrent checkpoint. @@ -400,7 +390,6 @@ impl GlobalBarrierManager { InflightActorInfo::default(), None, ); - let checkpoint_control = CheckpointControl::new(metrics.clone()); let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); @@ -419,6 +408,8 @@ impl GlobalBarrierManager { env: env.clone(), }; + let checkpoint_control = CheckpointControl::new(context.clone()); + let rpc_manager = BarrierRpcManager::new(context.clone()); Self { @@ -756,7 +747,6 @@ impl GlobalBarrierManager { err: MetaError, fail_nodes: impl IntoIterator, ) { - self.checkpoint_control.clear_changes(); self.rpc_manager.clear(); for node in fail_nodes { @@ -852,20 +842,13 @@ impl GlobalBarrierManager { notifier.notify_collected(); }); - // Save `cancelled_command` for Create MVs. - let actors_to_cancel = node.command_ctx.actors_to_cancel(); - let cancelled_command = if !actors_to_cancel.is_empty() { - let mut tracker = self.context.tracker.lock().await; - tracker.find_cancelled_command(actors_to_cancel) - } else { - None - }; + // Notify about collected. + let version_stats = self.context.hummock_manager.get_version_stats().await; + let mut tracker = self.context.tracker.lock().await; // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; - let version_stats = self.context.hummock_manager.get_version_stats().await; - let mut tracker = self.context.tracker.lock().await; // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { @@ -891,21 +874,16 @@ impl GlobalBarrierManager { }; for command in finished_commands { - self.checkpoint_control.stash_command_to_finish(command); + tracker.stash_command_to_finish(command); } - if let Some(command) = cancelled_command { - self.checkpoint_control.cancel_command(command); - } else if let Some(table_id) = node.command_ctx.table_to_cancel() { + if let Some(table_id) = node.command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. - self.checkpoint_control.cancel_stashed_command(table_id); + tracker.cancel_command(table_id); } - let remaining = self - .checkpoint_control - .finish_jobs(kind.is_checkpoint()) - .await?; + let remaining = tracker.finish_jobs(kind.is_checkpoint()).await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index f22c5a2bbb216..53ba3f511b641 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -16,7 +16,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::ddl_service::DdlProgress; @@ -44,7 +43,7 @@ enum BackfillState { /// Progress of all actors containing backfill executors while creating mview. #[derive(Debug)] -struct Progress { +pub(super) struct Progress { states: HashMap, done_count: usize, @@ -250,10 +249,13 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the TableId. - progress_map: HashMap, + pub(super) progress_map: HashMap, /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. - actor_map: HashMap, + pub(super) actor_map: HashMap, + + /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) + pub(super) finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -313,6 +315,7 @@ impl CreateMviewProgressTracker { Self { progress_map, actor_map, + finished_jobs: Vec::new(), } } @@ -320,6 +323,7 @@ impl CreateMviewProgressTracker { Self { progress_map: Default::default(), actor_map: Default::default(), + finished_jobs: Vec::new(), } } @@ -338,27 +342,6 @@ impl CreateMviewProgressTracker { .collect() } - /// Try to find the target create-streaming-job command from track. - /// - /// Return the target command as it should be cancelled based on the input actors. - pub fn find_cancelled_command( - &mut self, - actors_to_cancel: HashSet, - ) -> Option { - let epochs = actors_to_cancel - .into_iter() - .map(|actor_id| self.actor_map.get(&actor_id)) - .collect_vec(); - assert!(epochs.iter().all_equal()); - // If the target command found in progress map, return and remove it. Note that the command - // should have finished if not found. - if let Some(Some(epoch)) = epochs.first() { - Some(self.progress_map.remove(epoch).unwrap().1) - } else { - None - } - } - /// Add a new create-mview DDL command to track. /// /// If the actors to track is empty, return the given command as it can be finished immediately. @@ -496,7 +479,7 @@ impl CreateMviewProgressTracker { table_id ); - // Clean-up the mapping from actors to DDL epoch. + // Clean-up the mapping from actors to DDL table_id. for actor in o.get().0.actors() { self.actor_map.remove(&actor); } From 3e885e9db9e79ba0d99bdfb5017c7df46769d3e9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 19 Feb 2024 14:22:17 +0800 Subject: [PATCH 2/3] move code --- src/meta/src/barrier/mod.rs | 33 +----------------------------- src/meta/src/barrier/progress.rs | 35 +++++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 101085f40cb67..9b3fa3a4d65b7 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -50,7 +50,7 @@ use self::notifier::Notifier; use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; -use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; +use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::BarrierRpcManager; use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; @@ -208,37 +208,6 @@ impl CheckpointControl { } } -impl CreateMviewProgressTracker { - /// Stash a command to finish later. - fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_jobs.push(finished_job); - } - - /// Finish stashed jobs. - /// If checkpoint, means all jobs can be finished. - /// If not checkpoint, jobs which do not require checkpoint can be finished. - /// - /// Returns whether there are still remaining stashed jobs to finish. - async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { - for job in self - .finished_jobs - .extract_if(|job| checkpoint || !job.is_checkpoint_required()) - { - // The command is ready to finish. We can now call `pre_finish`. - job.pre_finish().await?; - job.notify_finished(); - } - Ok(!self.finished_jobs.is_empty()) - } - - fn cancel_command(&mut self, id: TableId) { - let _ = self.progress_map.remove(&id); - self.finished_jobs - .retain(|x| x.table_to_create() != Some(id)); - self.actor_map.retain(|_, table_id| *table_id != id); - } -} - impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 53ba3f511b641..5c1e701e6fc81 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -249,13 +249,13 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the TableId. - pub(super) progress_map: HashMap, + progress_map: HashMap, /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. - pub(super) actor_map: HashMap, + actor_map: HashMap, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - pub(super) finished_jobs: Vec, + finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -342,6 +342,35 @@ impl CreateMviewProgressTracker { .collect() } + /// Stash a command to finish later. + pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { + self.finished_jobs.push(finished_job); + } + + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. + /// + /// Returns whether there are still remaining stashed jobs to finish. + pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + for job in self + .finished_jobs + .extract_if(|job| checkpoint || !job.is_checkpoint_required()) + { + // The command is ready to finish. We can now call `pre_finish`. + job.pre_finish().await?; + job.notify_finished(); + } + Ok(!self.finished_jobs.is_empty()) + } + + pub(super) fn cancel_command(&mut self, id: TableId) { + let _ = self.progress_map.remove(&id); + self.finished_jobs + .retain(|x| x.table_to_create() != Some(id)); + self.actor_map.retain(|_, table_id| *table_id != id); + } + /// Add a new create-mview DDL command to track. /// /// If the actors to track is empty, return the given command as it can be finished immediately. From b02a7b447fa137a1383c2da278bb54f7b6eadd01 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 19 Feb 2024 14:24:08 +0800 Subject: [PATCH 3/3] minor --- src/meta/src/barrier/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9b3fa3a4d65b7..7e0726fa99fc5 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -206,9 +206,7 @@ impl CheckpointControl { context, } } -} -impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context.metrics.in_flight_barrier_nums.set(