diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 8ffe3b1ac0782..4d62f8da5122d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1199,7 +1199,6 @@ impl GlobalBarrierManagerContext { Ok(()) } - /// Try to commit this node. If err, returns async fn complete_barrier( self, node: EpochNode, @@ -1254,7 +1253,7 @@ impl GlobalBarrierManagerContext { }); try_join_all(finished_jobs.into_iter().map(|finished_job| { let metadata_manager = &self.metadata_manager; - async move { finished_job.pre_finish(metadata_manager).await } + async move { finished_job.finish(metadata_manager).await } })) .await?; let duration_sec = enqueue_time.stop_and_record(); diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 9dff098929cae..5754e4c60e364 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -161,8 +161,9 @@ pub enum TrackingJob { } impl TrackingJob { - pub(crate) async fn pre_finish(&self, metadata_manager: &MetadataManager) -> MetaResult<()> { - match &self { + /// Notify metadata manager that the job is finished. + pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> { + match self { TrackingJob::New(command) => { let CreateStreamingJobCommandInfo { table_fragments, @@ -276,11 +277,11 @@ pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, - /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. + /// Find the epoch of the create-mview DDL by the actor containing the MV/source backfill executors. actor_map: HashMap, - /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_jobs: Vec, + /// Stash of finished jobs. They will be finally finished on checkpoint. + pending_finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -331,7 +332,7 @@ impl CreateMviewProgressTracker { Self { progress_map, actor_map, - finished_jobs: Vec::new(), + pending_finished_jobs: Vec::new(), } } @@ -365,7 +366,7 @@ impl CreateMviewProgressTracker { Self { progress_map, actor_map, - finished_jobs: Vec::new(), + pending_finished_jobs: Vec::new(), } } @@ -457,33 +458,30 @@ impl CreateMviewProgressTracker { /// Stash a command to finish later. pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_jobs.push(finished_job); + self.pending_finished_jobs.push(finished_job); } - /// Finish stashed jobs. - /// If checkpoint, means all jobs can be finished. - /// If not checkpoint, jobs which do not require checkpoint can be finished. - /// - /// Returns whether there are still remaining stashed jobs to finish. + /// Finish stashed jobs on checkpoint. pub(super) fn take_finished_jobs(&mut self) -> Vec { - tracing::trace!(finished_jobs=?self.finished_jobs, progress_map=?self.progress_map, "finishing jobs"); - take(&mut self.finished_jobs) + tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "finishing jobs"); + take(&mut self.pending_finished_jobs) } pub(super) fn has_pending_finished_jobs(&self) -> bool { - !self.finished_jobs.is_empty() + !self.pending_finished_jobs.is_empty() } pub(super) fn cancel_command(&mut self, id: TableId) { let _ = self.progress_map.remove(&id); - self.finished_jobs.retain(|x| x.table_to_create() != id); + self.pending_finished_jobs + .retain(|x| x.table_to_create() != id); self.actor_map.retain(|_, table_id| *table_id != id); } /// Notify all tracked commands that error encountered and clear them. pub fn abort_all(&mut self) { self.actor_map.clear(); - self.finished_jobs.clear(); + self.pending_finished_jobs.clear(); self.progress_map.clear(); } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index d6523060e967d..bc918fb7d55fa 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -93,7 +93,7 @@ pub struct DatabaseManager { pub(super) in_progress_creation_tracker: HashSet, // In-progress creating streaming job tracker: this is a temporary workaround to avoid clean up // creating streaming jobs. - pub(super) in_progress_creation_streaming_job: HashMap, + pub(super) in_progress_creating_streaming_job: HashMap, // In-progress creating tables, including internal tables. pub(super) in_progress_creating_tables: HashMap, @@ -193,7 +193,7 @@ impl DatabaseManager { secrets, secret_ref_count, in_progress_creation_tracker: HashSet::default(), - in_progress_creation_streaming_job: HashMap::default(), + in_progress_creating_streaming_job: HashMap::default(), in_progress_creating_tables: HashMap::default(), creating_table_finish_notifier: Default::default(), }) @@ -575,7 +575,7 @@ impl DatabaseManager { /// Only for streaming DDL pub fn mark_creating_streaming_job(&mut self, table_id: TableId, key: RelationKey) { - self.in_progress_creation_streaming_job + self.in_progress_creating_streaming_job .insert(table_id, key); } @@ -584,7 +584,7 @@ impl DatabaseManager { } pub fn unmark_creating_streaming_job(&mut self, table_id: TableId) { - self.in_progress_creation_streaming_job.remove(&table_id); + self.in_progress_creating_streaming_job.remove(&table_id); for tx in self .creating_table_finish_notifier .remove(&table_id) @@ -598,7 +598,7 @@ impl DatabaseManager { } pub fn find_creating_streaming_job_id(&self, key: &RelationKey) -> Option { - self.in_progress_creation_streaming_job + self.in_progress_creating_streaming_job .iter() .find(|(_, v)| *v == key) .map(|(k, _)| *k) @@ -617,7 +617,7 @@ impl DatabaseManager { } pub fn all_creating_streaming_jobs(&self) -> impl Iterator + '_ { - self.in_progress_creation_streaming_job.keys().cloned() + self.in_progress_creating_streaming_job.keys().cloned() } pub fn mark_creating_tables(&mut self, tables: &[Table]) { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 8939e2f9802d9..81e4f1c4d96c3 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -221,7 +221,7 @@ impl CatalogManagerCore { .or_else(|| { if self .database - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .contains_key(&job.id()) { Some(false) @@ -253,7 +253,7 @@ impl CatalogManagerCore { } // Clear in progress creation streaming job. Note that background job is not tracked here, so that // it won't affect background jobs. - self.database.in_progress_creation_streaming_job.clear(); + self.database.in_progress_creating_streaming_job.clear(); } } @@ -1435,7 +1435,7 @@ impl CatalogManager { ); database_core.in_progress_creation_tracker.remove(&key); database_core - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .remove(&table.id); table.stream_job_status = PbStreamJobStatus::Created.into(); @@ -3373,7 +3373,7 @@ impl CatalogManager { .in_progress_creation_tracker .remove(&mview_key); database_core - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .remove(&mview.id); sources.insert(source.id, source.clone()); @@ -3508,7 +3508,7 @@ impl CatalogManager { database_core.in_progress_creation_tracker.remove(&key); database_core - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .remove(&table.id); index.stream_job_status = PbStreamJobStatus::Created.into(); @@ -3594,7 +3594,7 @@ impl CatalogManager { database_core.in_progress_creation_tracker.remove(&key); database_core - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .remove(&sink.id); sink.stream_job_status = PbStreamJobStatus::Created.into(); @@ -3713,7 +3713,7 @@ impl CatalogManager { database_core.in_progress_creation_tracker.remove(&key); database_core - .in_progress_creation_streaming_job + .in_progress_creating_streaming_job .remove(&subscription.id); subscription.subscription_state = PbSubscriptionState::Created.into(); diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index c1b02b0e72444..a639a77f73bfa 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -901,6 +901,8 @@ impl MetadataManager { } impl MetadataManager { + /// Wait for job finishing notification in `TrackingJob::pre_finish`. + /// The progress is updated per barrier. pub(crate) async fn wait_streaming_job_finished( &self, job: &StreamingJob, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8062675156fe4..feb7a959083bb 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -803,6 +803,8 @@ impl DdlController { } } + /// For [`CreateType::Foreground`], the function will only return after backfilling finishes + /// ([`MetadataManager::wait_streaming_job_finished`]). async fn create_streaming_job( &self, mut stream_job: StreamingJob,