diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 2fc012c031a9b..67dc3eb48863d 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; +use risingwave_pb::catalog::DdlType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; @@ -368,12 +369,13 @@ impl CreateMviewProgressTracker { return Some(TrackingJob::New(command)); } - let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition) = + let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) = if let Command::CreateStreamingJob { table_fragments, dispatchers, upstream_mview_actors, definition, + ddl_type, .. } = &command.context.command { @@ -404,6 +406,7 @@ impl CreateMviewProgressTracker { upstream_mv_count, upstream_total_key_count, definition.to_string(), + ddl_type, ) } else { unreachable!("Must be CreateStreamingJob."); @@ -419,11 +422,23 @@ impl CreateMviewProgressTracker { upstream_total_key_count, definition, ); - let old = self - .progress_map - .insert(creating_mv_id, (progress, TrackingJob::New(command))); - assert!(old.is_none()); - None + if *ddl_type == DdlType::Sink { + let tracking_job = TrackingJob::New(TrackingCommand { + context: command.context.clone(), + notifiers: vec![], + }); + let old = self + .progress_map + .insert(creating_mv_id, (progress, tracking_job)); + assert!(old.is_none()); + Some(TrackingJob::New(command)) + } else { + let old = self + .progress_map + .insert(creating_mv_id, (progress, TrackingJob::New(command))); + assert!(old.is_none()); + None + } } /// Update the progress of `actor` according to the Pb struct.