diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 07765fe840c38..71fc9b98b355b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -709,15 +709,6 @@ impl CommandContext { } } - /// For `CancelStreamingJob`, returns the actors of the `StreamScan` nodes. For other commands, - /// returns an empty set. - pub fn actors_to_cancel(&self) -> HashSet { - match &self.command { - Command::CancelStreamingJob(table_fragments) => table_fragments.backfill_actor_ids(), - _ => Default::default(), - } - } - /// For `CancelStreamingJob`, returns the table id of the target table. pub fn table_to_cancel(&self) -> Option { match &self.command { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b393db2d7dd41..eb905c5ad1e8f 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -54,7 +54,7 @@ use self::notifier::Notifier; use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; -use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; +use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::BarrierCollectFuture; use crate::barrier::state::BarrierManagerState; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; @@ -214,55 +214,7 @@ impl CheckpointControl { context, } } -} - -impl CreateMviewProgressTracker { - /// Stash a command to finish later. - fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_jobs.push(finished_job); - } - - /// Finish stashed jobs. - /// If checkpoint, means all jobs can be finished. - /// If not checkpoint, jobs which do not require checkpoint can be finished. - /// - /// Returns whether there are still remaining stashed jobs to finish. - async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { - for job in self - .finished_jobs - .extract_if(|job| checkpoint || !job.is_checkpoint_required()) - { - // The command is ready to finish. We can now call `pre_finish`. - job.pre_finish().await?; - job.notify_finished(); - } - Ok(!self.finished_jobs.is_empty()) - } -} - -impl CheckpointControl { - fn cancel_command(&mut self, cancelled_job: TrackingJob) { - if let TrackingJob::New(cancelled_command) = cancelled_job { - if let Some(index) = self.inflight_command_ctx_queue.iter().position(|command| { - command.command_ctx.prev_epoch.value() - == cancelled_command.context.prev_epoch.value() - }) { - self.inflight_command_ctx_queue.remove(index); - } - } else { - // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. - } - } -} -impl CreateMviewProgressTracker { - fn cancel_stashed_command(&mut self, id: TableId) { - self.finished_jobs - .retain(|x| x.table_to_create() != Some(id)); - } -} - -impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context @@ -357,15 +309,10 @@ struct InflightCommand { notifiers: Vec, } -struct CompleteBarrierOutput { - cancelled_job: Option, - has_remaining_job: bool, -} - struct CompletingCommand { command_ctx: Arc, - join_handle: JoinHandle>, + join_handle: JoinHandle>, } /// The result of barrier collect. @@ -735,7 +682,7 @@ impl GlobalBarrierManagerContext { resps: Vec, mut notifiers: Vec, enqueue_time: HistogramTimer, - ) -> MetaResult { + ) -> MetaResult { let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { @@ -814,22 +761,13 @@ impl GlobalBarrierManagerContext { notifiers: Vec, command_ctx: Arc, create_mview_progress: Vec, - ) -> MetaResult { + ) -> MetaResult { { { // Notify about collected. let version_stats = self.hummock_manager.get_version_stats().await; - let mut tracker = self.tracker.lock().await; - // Save `cancelled_command` for Create MVs. - let actors_to_cancel = command_ctx.actors_to_cancel(); - let cancelled_job = if !actors_to_cancel.is_empty() { - tracker.find_cancelled_command(actors_to_cancel) - } else { - None - }; - // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; @@ -864,17 +802,14 @@ impl GlobalBarrierManagerContext { if let Some(table_id) = command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. - tracker.cancel_stashed_command(table_id); + tracker.cancel_command(table_id); } let has_remaining_job = tracker .finish_jobs(command_ctx.kind.is_checkpoint()) .await?; - Ok(CompleteBarrierOutput { - cancelled_job, - has_remaining_job, - }) + Ok(has_remaining_job) } } } @@ -952,12 +887,9 @@ impl CheckpointControl { }) .and_then(|result| result); let completed_command = self.completing_command.take().expect("non-empty"); - let result = join_result.map(|output| { + let result = join_result.map(|has_remaining| { let command_ctx = completed_command.command_ctx.clone(); - if let Some(job) = output.cancelled_job { - self.cancel_command(job); - } - (command_ctx, output.has_remaining_job) + (command_ctx, has_remaining) }); Poll::Ready(result) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index e3694283fdb0d..5c1e701e6fc81 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -16,7 +16,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::ddl_service::DdlProgress; @@ -44,7 +43,7 @@ enum BackfillState { /// Progress of all actors containing backfill executors while creating mview. #[derive(Debug)] -struct Progress { +pub(super) struct Progress { states: HashMap, done_count: usize, @@ -256,7 +255,7 @@ pub(super) struct CreateMviewProgressTracker { actor_map: HashMap, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - pub(super) finished_jobs: Vec, + finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -343,25 +342,33 @@ impl CreateMviewProgressTracker { .collect() } - /// Try to find the target create-streaming-job command from track. + /// Stash a command to finish later. + pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { + self.finished_jobs.push(finished_job); + } + + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. /// - /// Return the target command as it should be cancelled based on the input actors. - pub fn find_cancelled_command( - &mut self, - actors_to_cancel: HashSet, - ) -> Option { - let epochs = actors_to_cancel - .into_iter() - .map(|actor_id| self.actor_map.get(&actor_id)) - .collect_vec(); - assert!(epochs.iter().all_equal()); - // If the target command found in progress map, return and remove it. Note that the command - // should have finished if not found. - if let Some(Some(epoch)) = epochs.first() { - Some(self.progress_map.remove(epoch).unwrap().1) - } else { - None + /// Returns whether there are still remaining stashed jobs to finish. + pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + for job in self + .finished_jobs + .extract_if(|job| checkpoint || !job.is_checkpoint_required()) + { + // The command is ready to finish. We can now call `pre_finish`. + job.pre_finish().await?; + job.notify_finished(); } + Ok(!self.finished_jobs.is_empty()) + } + + pub(super) fn cancel_command(&mut self, id: TableId) { + let _ = self.progress_map.remove(&id); + self.finished_jobs + .retain(|x| x.table_to_create() != Some(id)); + self.actor_map.retain(|_, table_id| *table_id != id); } /// Add a new create-mview DDL command to track. @@ -501,7 +508,7 @@ impl CreateMviewProgressTracker { table_id ); - // Clean-up the mapping from actors to DDL epoch. + // Clean-up the mapping from actors to DDL table_id. for actor in o.get().0.actors() { self.actor_map.remove(&actor); }