Skip to content

Commit

Permalink
dont track sink progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 5, 2024
1 parent 75735e2 commit d0b22c0
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 33 deletions.
4 changes: 1 addition & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,9 +1177,7 @@ impl GlobalBarrierManager {
// Update the progress of all commands.
for progress in resps.iter().flat_map(|r| &r.create_mview_progress) {
// Those with actors complete can be finished immediately.
if let Some(command) = tracker.update(progress, &version_stats)
&& !command.tracks_sink()
{
if let Some(command) = tracker.update(progress, &version_stats) {
tracing::trace!(?progress, "finish progress");
commands.push(command);
} else {
Expand Down
30 changes: 0 additions & 30 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,6 @@ impl TrackingJob {
TrackingJob::Recovered(recovered) => Some(recovered.fragments.table_id()),
}
}

pub(crate) fn tracks_sink(&self) -> bool {
match self {
TrackingJob::New(command) => command.tracks_sink(),
TrackingJob::Recovered(_) => false,
}
}
}

pub struct RecoveredTrackingJob {
Expand All @@ -247,15 +240,6 @@ pub(super) struct TrackingCommand {
pub notifiers: Vec<Notifier>,
}

impl TrackingCommand {
pub fn tracks_sink(&self) -> bool {
match &self.context.command {
Command::CreateStreamingJob { ddl_type, .. } => *ddl_type == DdlType::Sink,
_ => false,
}
}
}

/// Track the progress of all creating mviews. When creation is done, `notify_finished` will be
/// called on registered notifiers.
///
Expand Down Expand Up @@ -443,20 +427,6 @@ impl CreateMviewProgressTracker {
definition,
);
if *ddl_type == DdlType::Sink {
// First we duplicate a separate tracking job for sink.
// This does not need notifiers, it is solely used for
// tracking the backfill progress of sink.
// It will still be removed from progress map when
// backfill completes.
let tracking_job = TrackingJob::New(TrackingCommand {
context: command.context.clone(),
notifiers: vec![],
});
let old = self
.progress_map
.insert(creating_mv_id, (progress, tracking_job));
assert!(old.is_none());

// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down

0 comments on commit d0b22c0

Please sign in to comment.