From 9aded7178aefb5436009b4cebd5e8f46b59429b2 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:00:59 +0800 Subject: [PATCH] refactor(meta): merge drop and cancel streaming job command (#19369) --- src/meta/src/barrier/checkpoint/control.rs | 28 ++++++++------ src/meta/src/barrier/command.rs | 39 ++++++++++---------- src/meta/src/barrier/context/context_impl.rs | 19 ---------- src/meta/src/barrier/progress.rs | 2 +- src/meta/src/barrier/schedule.rs | 25 ++++--------- src/meta/src/stream/stream_manager.rs | 11 +++--- 6 files changed, 52 insertions(+), 72 deletions(-) diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index beb77b3217ad..31150554cc68 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -758,20 +758,26 @@ impl DatabaseCheckpointControl { (None, vec![]) }; - if let Some(table_to_cancel) = command.as_ref().and_then(Command::table_to_cancel) - && self + for table_to_cancel in command + .as_ref() + .map(Command::tables_to_drop) + .into_iter() + .flatten() + { + if self .creating_streaming_job_controls .contains_key(&table_to_cancel) - { - warn!( - table_id = table_to_cancel.table_id, - "ignore cancel command on creating streaming job" - ); - for notifier in notifiers { - notifier - .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into()); + { + warn!( + table_id = table_to_cancel.table_id, + "ignore cancel command on creating streaming job" + ); + for notifier in notifiers { + notifier + .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into()); + } + return Ok(()); } - return Ok(()); } if let Some(Command::RescheduleFragment { .. }) = &command { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 73ebc8d44629..d2dd3058544c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -232,6 +232,7 @@ pub enum Command { /// After the barrier is collected, it notifies the local stream manager of compute nodes to /// drop actors, and then delete the table fragments info from meta store. DropStreamingJobs { + table_fragments_ids: HashSet, actors: Vec, unregistered_state_table_ids: HashSet, unregistered_fragment_ids: HashSet, @@ -253,11 +254,6 @@ pub enum Command { MergeSnapshotBackfillStreamingJobs( HashMap, ), - /// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given - /// table fragment. - /// - /// The collecting and cleaning part works exactly the same as `DropStreamingJobs` command. - CancelStreamingJob(TableFragments), /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment. /// Mainly used for scaling and migration. @@ -313,6 +309,18 @@ impl Command { Self::Resume(reason) } + pub fn cancel(table_fragments: &TableFragments) -> Self { + Self::DropStreamingJobs { + table_fragments_ids: HashSet::from_iter([table_fragments.table_id()]), + actors: table_fragments.actor_ids(), + unregistered_state_table_ids: table_fragments + .all_table_ids() + .map(TableId::new) + .collect(), + unregistered_fragment_ids: table_fragments.fragment_ids().collect(), + } + } + pub(crate) fn fragment_changes(&self) -> Option> { match self { Command::Flush => None, @@ -352,13 +360,6 @@ impl Command { Some(changes) } - Command::CancelStreamingJob(table_fragments) => Some( - table_fragments - .fragments - .values() - .map(|fragment| (fragment.fragment_id, CommandFragmentChanges::RemoveFragment)) - .collect(), - ), Command::RescheduleFragment { reschedules, .. } => Some( reschedules .iter() @@ -726,11 +727,6 @@ impl Command { })) } - Command::CancelStreamingJob(table_fragments) => { - let actors = table_fragments.actor_ids(); - Some(Mutation::Stop(StopMutation { actors })) - } - Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, merge_updates, @@ -1013,10 +1009,15 @@ impl Command { } /// For `CancelStreamingJob`, returns the table id of the target table. - pub fn table_to_cancel(&self) -> Option { + pub fn tables_to_drop(&self) -> impl Iterator + '_ { match self { - Command::CancelStreamingJob(table_fragments) => Some(table_fragments.table_id()), + Command::DropStreamingJobs { + table_fragments_ids, + .. + } => Some(table_fragments_ids.iter().cloned()), _ => None, } + .into_iter() + .flatten() } } diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 947c8a08ad3f..fee2a31550cc 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use futures::future::try_join_all; -use risingwave_common::catalog::TableId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; @@ -163,24 +162,6 @@ impl CommandContext { .await?; } - Command::CancelStreamingJob(table_fragments) => { - tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job"); - - // NOTE(kwannoel): At this point, meta has already registered the table ids. - // We should unregister them. - // This is required for background ddl, for foreground ddl this is a no-op. - // Foreground ddl is handled entirely by stream manager, so it will unregister - // the table ids on failure. - // On the other hand background ddl could be handled by barrier manager. - // It won't clean the tables on failure, - // since the failure could be recoverable. - // As such it needs to be handled here. - barrier_manager_context - .hummock_manager - .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) - .await?; - } - Command::CreateStreamingJob { info, job_type } => { let CreateStreamingJobCommandInfo { table_fragments, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 36d1a9a0b242..a40d526bc9ee 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -447,7 +447,7 @@ impl CreateMviewProgressTracker { .flat_map(|resp| resp.create_mview_progress.iter()), version_stats, ); - if let Some(table_id) = command.and_then(Command::table_to_cancel) { + for table_id in command.map(Command::tables_to_drop).into_iter().flatten() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. self.cancel_command(table_id); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index ebffb56efe5a..2b3b78ede2f7 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::iter::once; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -32,7 +32,6 @@ use super::notifier::Notifier; use super::{Command, Scheduled}; use crate::barrier::context::GlobalBarrierWorkerContext; use crate::hummock::HummockManagerRef; -use crate::model::ActorId; use crate::rpc::metrics::MetaMetrics; use crate::{MetaError, MetaResult}; @@ -106,9 +105,7 @@ impl ScheduledQueue { if let QueueStatus::Blocked(reason) = &self.status && !matches!( scheduled.command, - Command::DropStreamingJobs { .. } - | Command::CancelStreamingJob(_) - | Command::DropSubscription { .. } + Command::DropStreamingJobs { .. } | Command::DropSubscription { .. } ) { return Err(MetaError::unavailable(reason)); @@ -400,9 +397,7 @@ impl ScheduledBarriers { impl ScheduledBarriers { /// Pre buffered drop and cancel command, return true if any. pub(super) fn pre_apply_drop_cancel(&self) -> bool { - let (dropped_actors, cancelled) = self.pre_apply_drop_cancel_scheduled(); - - !dropped_actors.is_empty() || !cancelled.is_empty() + self.pre_apply_drop_cancel_scheduled() } /// Mark command scheduler as blocked and abort all queued scheduled command and notify with @@ -425,22 +420,18 @@ impl ScheduledBarriers { /// Try to pre apply drop and cancel scheduled command and return them if any. /// It should only be called in recovery. - pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { + pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> bool { let mut queue = self.inner.queue.lock(); assert_matches!(queue.status, QueueStatus::Blocked(_)); - let (mut dropped_actors, mut cancel_table_ids) = (vec![], HashSet::new()); + let mut applied = false; while let Some(ScheduledQueueItem { notifiers, command, .. }) = queue.queue.pop_front() { match command { - Command::DropStreamingJobs { actors, .. } => { - dropped_actors.extend(actors); - } - Command::CancelStreamingJob(table_fragments) => { - let table_id = table_fragments.table_id(); - cancel_table_ids.insert(table_id); + Command::DropStreamingJobs { .. } => { + applied = true; } Command::DropSubscription { .. } => {} _ => { @@ -451,7 +442,7 @@ impl ScheduledBarriers { notify.notify_collected(); }); } - (dropped_actors, cancel_table_ids) + applied } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 509fffefd99b..d15a73ecfa9c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -299,10 +299,7 @@ impl GlobalStreamManager { .await?; self.barrier_scheduler - .run_command( - database_id, - Command::CancelStreamingJob(table_fragments), - ) + .run_command(database_id, Command::cancel(&table_fragments)) .await?; } else { // streaming job is already completed. @@ -514,6 +511,10 @@ impl GlobalStreamManager { .run_command( database_id, Command::DropStreamingJobs { + table_fragments_ids: streaming_job_ids + .iter() + .map(|job_id| TableId::new(*job_id as _)) + .collect(), actors: removed_actors, unregistered_state_table_ids: state_table_ids .into_iter() @@ -576,7 +577,7 @@ impl GlobalStreamManager { if let Some(database_id) = database_id { self.barrier_scheduler - .run_command(DatabaseId::new(database_id as _), Command::CancelStreamingJob(fragment)) + .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment)) .await?; } };