diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 486e314ae0467..4cfc21fd8f1c4 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,7 +41,9 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; use tracing::warn; -use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo}; +use super::info::{ + CommandActorChanges, CommandFragmentChanges, CommandNewFragmentInfo, InflightActorInfo, +}; use super::trace::TracedEpoch; use crate::barrier::GlobalBarrierManagerContext; use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId}; @@ -107,7 +109,7 @@ impl ReplaceTablePlan { fn actor_changes(&self) -> CommandActorChanges { let mut fragment_changes = HashMap::new(); for fragment in self.new_table_fragments.fragments.values() { - let fragment_change = CommandFragmentChanges::NewFragment { + let fragment_change = CommandFragmentChanges::NewFragment(CommandNewFragmentInfo { new_actors: fragment .actors .iter() @@ -130,7 +132,7 @@ impl ReplaceTablePlan { .map(|table_id| TableId::new(*table_id)) .collect(), is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask), - }; + }); assert!(fragment_changes .insert(fragment.fragment_id, fragment_change) .is_none()); @@ -144,6 +146,54 @@ impl ReplaceTablePlan { } } +#[derive(Debug, Clone)] +pub struct CreateStreamingJobCommandInfo { + pub table_fragments: TableFragments, + /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". + pub upstream_root_actors: HashMap>, + pub dispatchers: HashMap>, + pub init_split_assignment: SplitAssignment, + pub definition: String, + pub ddl_type: DdlType, + pub create_type: CreateType, + pub streaming_job: StreamingJob, + pub internal_tables: Vec, +} + +impl CreateStreamingJobCommandInfo { + fn new_fragment_info(&self) -> impl Iterator + '_ { + self.table_fragments.fragments.values().map(|fragment| { + ( + fragment.fragment_id, + CommandNewFragmentInfo { + new_actors: fragment + .actors + .iter() + .map(|actor| { + ( + actor.actor_id, + self.table_fragments + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .get_parallel_unit() + .expect("should set") + .worker_node_id, + ) + }) + .collect(), + table_ids: fragment + .state_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask), + }, + ) + }) + } +} + /// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands, /// it will build different barriers to send, and may do different stuffs after the barrier is /// collected. @@ -187,16 +237,7 @@ pub enum Command { /// for a while** until the `finish` channel is signaled, then the state of `TableFragments` /// will be set to `Created`. CreateStreamingJob { - streaming_job: StreamingJob, - internal_tables: Vec
, - table_fragments: TableFragments, - /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". - upstream_root_actors: HashMap>, - dispatchers: HashMap>, - init_split_assignment: SplitAssignment, - definition: String, - ddl_type: DdlType, - create_type: CreateType, + info: CreateStreamingJobCommandInfo, /// This is for create SINK into table. replace_table: Option, }, @@ -279,43 +320,13 @@ impl Command { .collect(), }), Command::CreateStreamingJob { - table_fragments, + info, replace_table, - .. } => { - let fragment_changes = table_fragments - .fragments - .values() - .map(|fragment| { - ( - fragment.fragment_id, - CommandFragmentChanges::NewFragment { - new_actors: fragment - .actors - .iter() - .map(|actor| { - ( - actor.actor_id, - table_fragments - .actor_status - .get(&actor.actor_id) - .expect("should exist") - .get_parallel_unit() - .expect("should set") - .worker_node_id, - ) - }) - .collect(), - table_ids: fragment - .state_table_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - is_injectable: TableFragments::is_injectable( - fragment.fragment_type_mask, - ), - }, - ) + let fragment_changes = info + .new_fragment_info() + .map(|(fragment_id, info)| { + (fragment_id, CommandFragmentChanges::NewFragment(info)) }) .collect(); let mut changes = CommandActorChanges { fragment_changes }; @@ -460,10 +471,6 @@ impl CommandContext { _span: span, } } - - pub fn metadata_manager(&self) -> &MetadataManager { - &self.barrier_manager_context.metadata_manager - } } impl CommandContext { @@ -521,11 +528,14 @@ impl CommandContext { })), Command::CreateStreamingJob { - table_fragments, - dispatchers, - init_split_assignment: split_assignment, + info: + CreateStreamingJobCommandInfo { + table_fragments, + dispatchers, + init_split_assignment: split_assignment, + .. + }, replace_table, - .. } => { let actor_dispatchers = dispatchers .iter() @@ -818,20 +828,6 @@ impl CommandContext { } } - /// For `CreateStreamingJob`, returns the actors of the `StreamScan`, and `StreamValue` nodes. For other commands, - /// returns an empty set. - pub fn actors_to_track(&self) -> HashSet { - match &self.command { - Command::CreateStreamingJob { - table_fragments, .. - } => table_fragments - .tracking_progress_actor_ids() - .into_iter() - .collect(), - _ => Default::default(), - } - } - /// For `CancelStreamingJob`, returns the table id of the target table. pub fn table_to_cancel(&self) -> Option { match &self.command { @@ -840,16 +836,6 @@ impl CommandContext { } } - /// For `CreateStreamingJob`, returns the table id of the target table. - pub fn table_to_create(&self) -> Option { - match &self.command { - Command::CreateStreamingJob { - table_fragments, .. - } => Some(table_fragments.table_id()), - _ => None, - } - } - /// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands. async fn clean_up(&self, actors: Vec) -> MetaResult<()> { self.barrier_manager_context @@ -992,14 +978,16 @@ impl CommandContext { } Command::CreateStreamingJob { - table_fragments, - dispatchers, - upstream_root_actors, - init_split_assignment, - definition: _, + info, replace_table, - .. } => { + let CreateStreamingJobCommandInfo { + table_fragments, + dispatchers, + upstream_root_actors, + init_split_assignment, + .. + } = info; match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { let mut dependent_table_actors = diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index f6617b9ceef47..a6bdcdace6e54 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -22,13 +22,16 @@ use crate::barrier::Command; use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId}; use crate::model::{ActorId, FragmentId}; +#[derive(Debug, Clone)] +pub(crate) struct CommandNewFragmentInfo { + pub new_actors: HashMap, + pub table_ids: HashSet, + pub is_injectable: bool, +} + #[derive(Debug, Clone)] pub(crate) enum CommandFragmentChanges { - NewFragment { - new_actors: HashMap, - table_ids: HashSet, - is_injectable: bool, - }, + NewFragment(CommandNewFragmentInfo), Reschedule { new_actors: HashMap, to_remove: HashSet, @@ -149,11 +152,12 @@ impl InflightActorInfo { let mut to_add = HashMap::new(); for (fragment_id, change) in fragment_changes { match change { - CommandFragmentChanges::NewFragment { + CommandFragmentChanges::NewFragment(CommandNewFragmentInfo { new_actors, table_ids, is_injectable, - } => { + .. + }) => { for (actor_id, node_id) in &new_actors { assert!(to_add .insert(*actor_id, (*node_id, is_injectable)) @@ -232,7 +236,7 @@ impl InflightActorInfo { let mut all_to_remove = HashSet::new(); for (fragment_id, changes) in fragment_changes.fragment_changes { match changes { - CommandFragmentChanges::NewFragment { .. } => {} + CommandFragmentChanges::NewFragment(_) => {} CommandFragmentChanges::Reschedule { to_remove, .. } => { let info = self .fragment_infos diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 1c05497665fef..71b7ce60affc7 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -23,7 +23,9 @@ use std::time::Duration; use anyhow::Context; use arc_swap::ArcSwap; use fail::fail_point; +use futures::future::try_join_all; use itertools::Itertools; +use parking_lot::Mutex; use prometheus::HistogramTimer; use risingwave_common::bail; use risingwave_common::catalog::TableId; @@ -44,16 +46,14 @@ use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgres use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; -use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; -use crate::barrier::progress::CreateMviewProgressTracker; +use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; @@ -77,7 +77,9 @@ mod schedule; mod state; mod trace; -pub use self::command::{BarrierKind, Command, ReplaceTablePlan, Reschedule}; +pub use self::command::{ + BarrierKind, Command, CreateStreamingJobCommandInfo, ReplaceTablePlan, Reschedule, +}; pub use self::rpc::StreamRpcManager; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -456,7 +458,7 @@ impl GlobalBarrierManager { let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); - let tracker = CreateMviewProgressTracker::new(); + let tracker = CreateMviewProgressTracker::default(); let context = GlobalBarrierManagerContext { status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))), @@ -789,6 +791,7 @@ impl GlobalBarrierManager { } async fn failure_recovery(&mut self, err: MetaError) { + self.context.tracker.lock().abort_all(); self.checkpoint_control.clear_on_err(&err).await; self.pending_non_checkpoint_barriers.clear(); @@ -816,6 +819,7 @@ impl GlobalBarrierManager { async fn adhoc_recovery(&mut self) { let err = MetaErrorInner::AdhocRecovery.into(); + self.context.tracker.lock().abort_all(); self.checkpoint_control.clear_on_err(&err).await; self.context @@ -846,13 +850,14 @@ impl GlobalBarrierManagerContext { .. } = node; assert!(state.node_to_collect.is_empty()); - let resps = state.resps; let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - let create_mview_progress = resps + let create_mview_progress = state + .resps .iter() .flat_map(|resp| resp.create_mview_progress.iter().cloned()) .collect(); - if let Err(e) = self.update_snapshot(&command_ctx, resps).await { + + if let Err(e) = self.update_snapshot(&command_ctx, state).await { for notifier in notifiers { notifier.notify_collection_failed(e.clone()); } @@ -861,9 +866,15 @@ impl GlobalBarrierManagerContext { notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); - let has_remaining = self + + let (has_remaining, finished_jobs) = self .update_tracking_jobs(command_ctx.clone(), create_mview_progress) - .await?; + .await; + try_join_all(finished_jobs.into_iter().map(|finished_job| { + let metadata_manager = &self.metadata_manager; + async move { finished_job.pre_finish(metadata_manager).await } + })) + .await?; let duration_sec = enqueue_time.stop_and_record(); self.report_complete_event(duration_sec, &command_ctx); wait_commit_timer.observe_duration(); @@ -879,7 +890,7 @@ impl GlobalBarrierManagerContext { async fn update_snapshot( &self, command_ctx: &CommandContext, - resps: Vec, + state: BarrierEpochState, ) -> MetaResult<()> { { { @@ -894,7 +905,7 @@ impl GlobalBarrierManagerContext { match &command_ctx.kind { BarrierKind::Initial => {} BarrierKind::Checkpoint(epochs) => { - let commit_info = collect_commit_epoch_info(resps, command_ctx, epochs); + let commit_info = collect_commit_epoch_info(state, command_ctx, epochs); new_snapshot = self.hummock_manager.commit_epoch(commit_info).await?; } BarrierKind::Barrier => { @@ -926,23 +937,18 @@ impl GlobalBarrierManagerContext { &self, command_ctx: Arc, create_mview_progress: Vec, - ) -> MetaResult { + ) -> (bool, Vec) { { { // Notify about collected. let version_stats = self.hummock_manager.get_version_stats().await; - let mut tracker = self.tracker.lock().await; + let mut tracker = self.tracker.lock(); // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; // Add the command to tracker. - if let Some(command) = tracker.add( - TrackingCommand { - context: command_ctx.clone(), - }, - &version_stats, - ) { + if let Some(command) = tracker.add(&command_ctx, &version_stats) { // Those with no actors to track can be finished immediately. commands.push(command); } @@ -969,11 +975,11 @@ impl GlobalBarrierManagerContext { tracker.cancel_command(table_id); } - let has_remaining_job = tracker - .finish_jobs(command_ctx.kind.is_checkpoint()) - .await?; - - Ok(has_remaining_job) + if command_ctx.kind.is_checkpoint() { + (false, tracker.take_finished_jobs()) + } else { + (tracker.has_pending_finished_jobs(), vec![]) + } } } } @@ -1099,7 +1105,7 @@ impl GlobalBarrierManagerContext { } pub async fn get_ddl_progress(&self) -> Vec { - let mut ddl_progress = self.tracker.lock().await.gen_ddl_progress(); + let mut ddl_progress = self.tracker.lock().gen_ddl_progress(); // If not in tracker, means the first barrier not collected yet. // In that case just return progress 0. match &self.metadata_manager { @@ -1142,10 +1148,11 @@ impl GlobalBarrierManagerContext { pub type BarrierManagerRef = GlobalBarrierManagerContext; fn collect_commit_epoch_info( - resps: Vec, + state: BarrierEpochState, command_ctx: &CommandContext, epochs: &Vec, ) -> CommitEpochInfo { + let resps = state.resps; let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; let mut table_watermarks = Vec::with_capacity(resps.len()); @@ -1163,22 +1170,21 @@ fn collect_commit_epoch_info( table_watermarks.push(resp.table_watermarks); old_value_ssts.extend(resp.old_value_sstables); } - let new_table_fragment_info = if let Command::CreateStreamingJob { - table_fragments, .. - } = &command_ctx.command - { - Some(NewTableFragmentInfo { - table_id: table_fragments.table_id(), - mv_table_id: table_fragments.mv_table_id().map(TableId::new), - internal_table_ids: table_fragments - .internal_table_ids() - .into_iter() - .map(TableId::new) - .collect(), - }) - } else { - None - }; + let new_table_fragment_info = + if let Command::CreateStreamingJob { info, .. } = &command_ctx.command { + let table_fragments = &info.table_fragments; + Some(NewTableFragmentInfo { + table_id: table_fragments.table_id(), + mv_table_id: table_fragments.mv_table_id().map(TableId::new), + internal_table_ids: table_fragments + .internal_table_ids() + .into_iter() + .map(TableId::new) + .collect(), + }) + } else { + None + }; let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 5fdf875486fd9..fc562c67e71f0 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -14,6 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::mem::take; use std::sync::Arc; use risingwave_common::catalog::TableId; @@ -25,12 +26,12 @@ use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use super::command::CommandContext; -use crate::barrier::{Command, GlobalBarrierManagerContext}; +use crate::barrier::{Command, CreateStreamingJobCommandInfo, ReplaceTablePlan}; use crate::manager::{ DdlType, MetadataManager, MetadataManagerV1, MetadataManagerV2, StreamingJob, }; use crate::model::{ActorId, TableFragments}; -use crate::{MetaError, MetaResult}; +use crate::MetaResult; type ConsumedRows = u64; @@ -160,28 +161,16 @@ pub enum TrackingJob { } impl TrackingJob { - /// Returns whether the `TrackingJob` requires a checkpoint to complete. - pub(crate) fn is_checkpoint_required(&self) -> bool { - match self { - // Recovered tracking job is always a streaming job, - // It requires a checkpoint to complete. - TrackingJob::RecoveredV1(_) | TrackingJob::RecoveredV2(_) => true, - TrackingJob::New(command) => { - command.context.kind.is_initial() || command.context.kind.is_checkpoint() - } - } - } - - pub(crate) async fn pre_finish(&self) -> MetaResult<()> { + pub(crate) async fn pre_finish(&self, metadata_manager: &MetadataManager) -> MetaResult<()> { match &self { - TrackingJob::New(command) => match &command.context.command { - Command::CreateStreamingJob { + TrackingJob::New(command) => { + let CreateStreamingJobCommandInfo { table_fragments, streaming_job, internal_tables, - replace_table, .. - } => match command.context.metadata_manager() { + } = &command.info; + match metadata_manager { MetadataManager::V1(mgr) => { mgr.fragment_manager .mark_table_fragments_created(table_fragments.table_id()) @@ -193,13 +182,15 @@ impl TrackingJob { } MetadataManager::V2(mgr) => { mgr.catalog_controller - .finish_streaming_job(streaming_job.id() as i32, replace_table.clone()) + .finish_streaming_job( + streaming_job.id() as i32, + command.replace_table_info.clone(), + ) .await?; Ok(()) } - }, - _ => Ok(()), - }, + } + } TrackingJob::RecoveredV1(recovered) => { let manager = &recovered.metadata_manager; manager @@ -226,11 +217,11 @@ impl TrackingJob { } } - pub(crate) fn table_to_create(&self) -> Option { + pub(crate) fn table_to_create(&self) -> TableId { match self { - TrackingJob::New(command) => command.context.table_to_create(), - TrackingJob::RecoveredV1(recovered) => Some(recovered.fragments.table_id()), - TrackingJob::RecoveredV2(recovered) => Some((recovered.id as u32).into()), + TrackingJob::New(command) => command.info.table_fragments.table_id(), + TrackingJob::RecoveredV1(recovered) => recovered.fragments.table_id(), + TrackingJob::RecoveredV2(recovered) => (recovered.id as u32).into(), } } } @@ -241,7 +232,7 @@ impl std::fmt::Debug for TrackingJob { TrackingJob::New(command) => write!( f, "TrackingJob::New({:?})", - command.context.table_to_create() + command.info.table_fragments.table_id() ), TrackingJob::RecoveredV1(recovered) => { write!( @@ -271,8 +262,8 @@ pub struct RecoveredTrackingJobV2 { /// The command tracking by the [`CreateMviewProgressTracker`]. pub(super) struct TrackingCommand { - /// The context of the command. - pub context: Arc, + pub info: CreateStreamingJobCommandInfo, + pub replace_table_info: Option, } /// Tracking is done as follows: @@ -280,6 +271,7 @@ pub(super) struct TrackingCommand { /// 2. For each stream job, there are several actors which run its tasks. /// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress. /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. +#[derive(Default)] pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, @@ -403,14 +395,6 @@ impl CreateMviewProgressTracker { } } - pub fn new() -> Self { - Self { - progress_map: Default::default(), - actor_map: Default::default(), - finished_jobs: Vec::new(), - } - } - pub fn gen_ddl_progress(&self) -> HashMap { self.progress_map .iter() @@ -436,38 +420,26 @@ impl CreateMviewProgressTracker { /// If not checkpoint, jobs which do not require checkpoint can be finished. /// /// Returns whether there are still remaining stashed jobs to finish. - pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + pub(super) fn take_finished_jobs(&mut self) -> Vec { tracing::trace!(finished_jobs=?self.finished_jobs, progress_map=?self.progress_map, "finishing jobs"); - for job in self - .finished_jobs - .extract_if(|job| checkpoint || !job.is_checkpoint_required()) - { - // The command is ready to finish. We can now call `pre_finish`. - job.pre_finish().await?; - } - Ok(!self.finished_jobs.is_empty()) + take(&mut self.finished_jobs) + } + + pub(super) fn has_pending_finished_jobs(&self) -> bool { + !self.finished_jobs.is_empty() } pub(super) fn cancel_command(&mut self, id: TableId) { let _ = self.progress_map.remove(&id); - self.finished_jobs - .retain(|x| x.table_to_create() != Some(id)); + self.finished_jobs.retain(|x| x.table_to_create() != id); self.actor_map.retain(|_, table_id| *table_id != id); } /// Notify all tracked commands that error encountered and clear them. - pub async fn abort_all(&mut self, err: &MetaError, context: &GlobalBarrierManagerContext) { + pub fn abort_all(&mut self) { self.actor_map.clear(); self.finished_jobs.clear(); self.progress_map.clear(); - match &context.metadata_manager { - MetadataManager::V1(mgr) => { - mgr.notify_finish_failed(err).await; - } - MetadataManager::V2(mgr) => { - mgr.notify_finish_failed(err).await; - } - } } /// Add a new create-mview DDL command to track. @@ -475,32 +447,43 @@ impl CreateMviewProgressTracker { /// If the actors to track is empty, return the given command as it can be finished immediately. pub fn add( &mut self, - command: TrackingCommand, + command_ctx: &Arc, version_stats: &HummockVersionStats, ) -> Option { - let actors = command.context.actors_to_track(); - if actors.is_empty() { - // The command can be finished immediately. - return Some(TrackingJob::New(command)); - } + let (info, actors, replace_table_info) = if let Command::CreateStreamingJob { + info, + replace_table, + } = &command_ctx.command + { + let CreateStreamingJobCommandInfo { + table_fragments, .. + } = info; + let actors = table_fragments.tracking_progress_actor_ids(); + if actors.is_empty() { + // The command can be finished immediately. + return Some(TrackingJob::New(TrackingCommand { + info: info.clone(), + replace_table_info: replace_table.clone(), + })); + } + (info.clone(), actors, replace_table.clone()) + } else { + return None; + }; - let ( - creating_mv_id, - upstream_mv_count, - upstream_total_key_count, - definition, - ddl_type, - create_type, - ) = if let Command::CreateStreamingJob { + let CreateStreamingJobCommandInfo { table_fragments, - dispatchers, upstream_root_actors, + dispatchers, definition, ddl_type, create_type, .. - } = &command.context.command - { + } = &info; + + let creating_mv_id = table_fragments.table_id(); + + let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = { // Keep track of how many times each upstream MV appears. let mut upstream_mv_count = HashMap::new(); for (table_id, actors) in upstream_root_actors { @@ -524,15 +507,11 @@ impl CreateMviewProgressTracker { }) .sum(); ( - table_fragments.table_id(), upstream_mv_count, upstream_total_key_count, - definition.to_string(), ddl_type, create_type, ) - } else { - unreachable!("Must be CreateStreamingJob."); }; for &actor in &actors { @@ -543,7 +522,7 @@ impl CreateMviewProgressTracker { actors, upstream_mv_count, upstream_total_key_count, - definition, + definition.clone(), ); if *ddl_type == DdlType::Sink && *create_type == CreateType::Background { // We return the original tracking job immediately. @@ -551,11 +530,21 @@ impl CreateMviewProgressTracker { // We don't need to wait for sink to finish backfill. // This still contains the notifiers, so we can tell listeners // that the sink job has been created. - Some(TrackingJob::New(command)) + Some(TrackingJob::New(TrackingCommand { + info, + replace_table_info, + })) } else { - let old = self - .progress_map - .insert(creating_mv_id, (progress, TrackingJob::New(command))); + let old = self.progress_map.insert( + creating_mv_id, + ( + progress, + TrackingJob::New(TrackingCommand { + info, + replace_table_info, + }), + ), + ); assert!(old.is_none()); None } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f9bba534e561f..92c13e543a73f 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -152,8 +152,7 @@ impl GlobalBarrierManagerContext { let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. { - let mut tracker = self.tracker.lock().await; - *tracker = + *self.tracker.lock() = CreateMviewProgressTracker::recover_v1(version_stats, table_mview_map, mgr.clone()); } Ok(()) @@ -180,8 +179,7 @@ impl GlobalBarrierManagerContext { let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. { - let mut tracker = self.tracker.lock().await; - *tracker = + *self.tracker.lock() = CreateMviewProgressTracker::recover_v2(mview_map, version_stats, mgr.clone()); } Ok(()) @@ -248,10 +246,8 @@ impl GlobalBarrierManager { let recovery_result: MetaResult<_> = try { if let Some(err) = &err { self.context - .tracker - .lock() - .await - .abort_all(err, &self.context) + .metadata_manager + .notify_finish_failed(err) .await; } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 7af683c18fc7f..d2a2febbec2fa 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -184,10 +184,8 @@ impl BarrierScheduler { pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { let queue = &mut self.inner.queue.lock(); if let Some(idx) = queue.queue.iter().position(|scheduled| { - if let Command::CreateStreamingJob { - table_fragments, .. - } = &scheduled.command - && table_fragments.table_id() == table_id + if let Command::CreateStreamingJob { info, .. } = &scheduled.command + && info.table_fragments.table_id() == table_id { true } else { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ecd9d4971d2b8..a2aab4371b177 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -855,6 +855,17 @@ impl MetadataManager { MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await, } } + + pub(crate) async fn notify_finish_failed(&self, err: &MetaError) { + match self { + MetadataManager::V1(mgr) => { + mgr.notify_finish_failed(err).await; + } + MetadataManager::V2(mgr) => { + mgr.notify_finish_failed(err).await; + } + } + } } impl MetadataManagerV2 { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2756d71a8a6cf..260d1ed537ac1 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -29,7 +29,9 @@ use tokio::sync::{oneshot, Mutex}; use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; -use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; +use crate::barrier::{ + BarrierScheduler, Command, CreateStreamingJobCommandInfo, ReplaceTablePlan, StreamRpcManager, +}; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::{to_build_actor_info, SourceManagerRef}; @@ -474,7 +476,7 @@ impl GlobalStreamManager { .await?, ); - let command = Command::CreateStreamingJob { + let info = CreateStreamingJobCommandInfo { table_fragments, upstream_root_actors, dispatchers, @@ -483,9 +485,13 @@ impl GlobalStreamManager { streaming_job: streaming_job.clone(), internal_tables: internal_tables.into_values().collect_vec(), ddl_type, - replace_table: replace_table_command, create_type, }; + + let command = Command::CreateStreamingJob { + info, + replace_table: replace_table_command, + }; tracing::debug!("sending Command::CreateStreamingJob"); let result: MetaResult = try { self.barrier_scheduler.run_command(command).await?;