Skip to content

Commit

Permalink
return finished if tracking job complete
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 27, 2023
1 parent eb9b821 commit 0b98b69
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::DdlType;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
Expand Down Expand Up @@ -368,12 +369,13 @@ impl CreateMviewProgressTracker {
return Some(TrackingJob::New(command));
}

let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition) =
let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) =
if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_mview_actors,
definition,
ddl_type,
..
} = &command.context.command
{
Expand Down Expand Up @@ -404,6 +406,7 @@ impl CreateMviewProgressTracker {
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
Expand All @@ -419,11 +422,23 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition,
);
let old = self
.progress_map
.insert(creating_mv_id, (progress, TrackingJob::New(command)));
assert!(old.is_none());
None
if *ddl_type == DdlType::Sink {
let tracking_job = TrackingJob::New(TrackingCommand {
context: command.context.clone(),
notifiers: vec![],
});
let old = self
.progress_map
.insert(creating_mv_id, (progress, tracking_job));
assert!(old.is_none());
Some(TrackingJob::New(command))
} else {
let old = self
.progress_map
.insert(creating_mv_id, (progress, TrackingJob::New(command)));
assert!(old.is_none());
None
}
}

/// Update the progress of `actor` according to the Pb struct.
Expand Down

0 comments on commit 0b98b69

Please sign in to comment.