Skip to content

Commit

Permalink
refactor: add some comments for MV progress tracking (#18110)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Aug 23, 2024
1 parent 9587945 commit 321610d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
3 changes: 1 addition & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,6 @@ impl GlobalBarrierManagerContext {
Ok(())
}

/// Try to commit this node. If err, returns
async fn complete_barrier(
self,
node: EpochNode,
Expand Down Expand Up @@ -1254,7 +1253,7 @@ impl GlobalBarrierManagerContext {
});
try_join_all(finished_jobs.into_iter().map(|finished_job| {
let metadata_manager = &self.metadata_manager;
async move { finished_job.pre_finish(metadata_manager).await }
async move { finished_job.finish(metadata_manager).await }
}))
.await?;
let duration_sec = enqueue_time.stop_and_record();
Expand Down
34 changes: 16 additions & 18 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ pub enum TrackingJob {
}

impl TrackingJob {
pub(crate) async fn pre_finish(&self, metadata_manager: &MetadataManager) -> MetaResult<()> {
match &self {
/// Notify metadata manager that the job is finished.
pub(crate) async fn finish(self, metadata_manager: &MetadataManager) -> MetaResult<()> {
match self {
TrackingJob::New(command) => {
let CreateStreamingJobCommandInfo {
table_fragments,
Expand Down Expand Up @@ -276,11 +277,11 @@ pub(super) struct CreateMviewProgressTracker {
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

/// Find the epoch of the create-mview DDL by the actor containing the backfill executors.
/// Find the epoch of the create-mview DDL by the actor containing the MV/source backfill executors.
actor_map: HashMap<ActorId, TableId>,

/// Get notified when we finished Create MV and collect a barrier(checkpoint = true)
finished_jobs: Vec<TrackingJob>,
/// Stash of finished jobs. They will be finally finished on checkpoint.
pending_finished_jobs: Vec<TrackingJob>,
}

impl CreateMviewProgressTracker {
Expand Down Expand Up @@ -331,7 +332,7 @@ impl CreateMviewProgressTracker {
Self {
progress_map,
actor_map,
finished_jobs: Vec::new(),
pending_finished_jobs: Vec::new(),
}
}

Expand Down Expand Up @@ -365,7 +366,7 @@ impl CreateMviewProgressTracker {
Self {
progress_map,
actor_map,
finished_jobs: Vec::new(),
pending_finished_jobs: Vec::new(),
}
}

Expand Down Expand Up @@ -457,33 +458,30 @@ impl CreateMviewProgressTracker {

/// Stash a command to finish later.
pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
self.finished_jobs.push(finished_job);
self.pending_finished_jobs.push(finished_job);
}

/// Finish stashed jobs.
/// If checkpoint, means all jobs can be finished.
/// If not checkpoint, jobs which do not require checkpoint can be finished.
///
/// Returns whether there are still remaining stashed jobs to finish.
/// Finish stashed jobs on checkpoint.
pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
tracing::trace!(finished_jobs=?self.finished_jobs, progress_map=?self.progress_map, "finishing jobs");
take(&mut self.finished_jobs)
tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "finishing jobs");
take(&mut self.pending_finished_jobs)
}

pub(super) fn has_pending_finished_jobs(&self) -> bool {
!self.finished_jobs.is_empty()
!self.pending_finished_jobs.is_empty()
}

pub(super) fn cancel_command(&mut self, id: TableId) {
let _ = self.progress_map.remove(&id);
self.finished_jobs.retain(|x| x.table_to_create() != id);
self.pending_finished_jobs
.retain(|x| x.table_to_create() != id);
self.actor_map.retain(|_, table_id| *table_id != id);
}

/// Notify all tracked commands that error encountered and clear them.
pub fn abort_all(&mut self) {
self.actor_map.clear();
self.finished_jobs.clear();
self.pending_finished_jobs.clear();
self.progress_map.clear();
}

Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct DatabaseManager {
pub(super) in_progress_creation_tracker: HashSet<RelationKey>,
// In-progress creating streaming job tracker: this is a temporary workaround to avoid clean up
// creating streaming jobs.
pub(super) in_progress_creation_streaming_job: HashMap<TableId, RelationKey>,
pub(super) in_progress_creating_streaming_job: HashMap<TableId, RelationKey>,
// In-progress creating tables, including internal tables.
pub(super) in_progress_creating_tables: HashMap<TableId, Table>,

Expand Down Expand Up @@ -193,7 +193,7 @@ impl DatabaseManager {
secrets,
secret_ref_count,
in_progress_creation_tracker: HashSet::default(),
in_progress_creation_streaming_job: HashMap::default(),
in_progress_creating_streaming_job: HashMap::default(),
in_progress_creating_tables: HashMap::default(),
creating_table_finish_notifier: Default::default(),
})
Expand Down Expand Up @@ -575,7 +575,7 @@ impl DatabaseManager {

/// Only for streaming DDL
pub fn mark_creating_streaming_job(&mut self, table_id: TableId, key: RelationKey) {
self.in_progress_creation_streaming_job
self.in_progress_creating_streaming_job
.insert(table_id, key);
}

Expand All @@ -584,7 +584,7 @@ impl DatabaseManager {
}

pub fn unmark_creating_streaming_job(&mut self, table_id: TableId) {
self.in_progress_creation_streaming_job.remove(&table_id);
self.in_progress_creating_streaming_job.remove(&table_id);
for tx in self
.creating_table_finish_notifier
.remove(&table_id)
Expand All @@ -598,7 +598,7 @@ impl DatabaseManager {
}

pub fn find_creating_streaming_job_id(&self, key: &RelationKey) -> Option<TableId> {
self.in_progress_creation_streaming_job
self.in_progress_creating_streaming_job
.iter()
.find(|(_, v)| *v == key)
.map(|(k, _)| *k)
Expand All @@ -617,7 +617,7 @@ impl DatabaseManager {
}

pub fn all_creating_streaming_jobs(&self) -> impl Iterator<Item = TableId> + '_ {
self.in_progress_creation_streaming_job.keys().cloned()
self.in_progress_creating_streaming_job.keys().cloned()
}

pub fn mark_creating_tables(&mut self, tables: &[Table]) {
Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CatalogManagerCore {
.or_else(|| {
if self
.database
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.contains_key(&job.id())
{
Some(false)
Expand Down Expand Up @@ -253,7 +253,7 @@ impl CatalogManagerCore {
}
// Clear in progress creation streaming job. Note that background job is not tracked here, so that
// it won't affect background jobs.
self.database.in_progress_creation_streaming_job.clear();
self.database.in_progress_creating_streaming_job.clear();
}
}

Expand Down Expand Up @@ -1435,7 +1435,7 @@ impl CatalogManager {
);
database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.remove(&table.id);

table.stream_job_status = PbStreamJobStatus::Created.into();
Expand Down Expand Up @@ -3373,7 +3373,7 @@ impl CatalogManager {
.in_progress_creation_tracker
.remove(&mview_key);
database_core
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.remove(&mview.id);

sources.insert(source.id, source.clone());
Expand Down Expand Up @@ -3508,7 +3508,7 @@ impl CatalogManager {

database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.remove(&table.id);

index.stream_job_status = PbStreamJobStatus::Created.into();
Expand Down Expand Up @@ -3594,7 +3594,7 @@ impl CatalogManager {

database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.remove(&sink.id);

sink.stream_job_status = PbStreamJobStatus::Created.into();
Expand Down Expand Up @@ -3713,7 +3713,7 @@ impl CatalogManager {

database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.in_progress_creating_streaming_job
.remove(&subscription.id);

subscription.subscription_state = PbSubscriptionState::Created.into();
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,8 @@ impl MetadataManager {
}

impl MetadataManager {
/// Wait for job finishing notification in `TrackingJob::pre_finish`.
/// The progress is updated per barrier.
pub(crate) async fn wait_streaming_job_finished(
&self,
job: &StreamingJob,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,8 @@ impl DdlController {
}
}

/// For [`CreateType::Foreground`], the function will only return after backfilling finishes
/// ([`MetadataManager::wait_streaming_job_finished`]).
async fn create_streaming_job(
&self,
mut stream_job: StreamingJob,
Expand Down

0 comments on commit 321610d

Please sign in to comment.