Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): remove stream job progress for sink #14388

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,9 +1177,7 @@ impl GlobalBarrierManager {
// Update the progress of all commands.
for progress in resps.iter().flat_map(|r| &r.create_mview_progress) {
// Those with actors complete can be finished immediately.
if let Some(command) = tracker.update(progress, &version_stats)
&& !command.tracks_sink()
{
if let Some(command) = tracker.update(progress, &version_stats) {
tracing::trace!(?progress, "finish progress");
commands.push(command);
} else {
Expand Down
30 changes: 0 additions & 30 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,6 @@ impl TrackingJob {
TrackingJob::Recovered(recovered) => Some(recovered.fragments.table_id()),
}
}

pub(crate) fn tracks_sink(&self) -> bool {
match self {
TrackingJob::New(command) => command.tracks_sink(),
TrackingJob::Recovered(_) => false,
}
}
}

pub struct RecoveredTrackingJob {
Expand All @@ -247,15 +240,6 @@ pub(super) struct TrackingCommand {
pub notifiers: Vec<Notifier>,
}

impl TrackingCommand {
pub fn tracks_sink(&self) -> bool {
match &self.context.command {
Command::CreateStreamingJob { ddl_type, .. } => *ddl_type == DdlType::Sink,
_ => false,
}
}
}

/// Track the progress of all creating mviews. When creation is done, `notify_finished` will be
/// called on registered notifiers.
///
Expand Down Expand Up @@ -443,20 +427,6 @@ impl CreateMviewProgressTracker {
definition,
);
if *ddl_type == DdlType::Sink {
// First we duplicate a separate tracking job for sink.
// This does not need notifiers, it is solely used for
// tracking the backfill progress of sink.
// It will still be removed from progress map when
// backfill completes.
let tracking_job = TrackingJob::New(TrackingCommand {
context: command.context.clone(),
notifiers: vec![],
});
let old = self
.progress_map
.insert(creating_mv_id, (progress, tracking_job));
assert!(old.is_none());

// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down
Loading