diff --git a/Cargo.lock b/Cargo.lock index c8bb3bb7afa8..90bc30c5f742 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11207,6 +11207,7 @@ dependencies = [ "comfy-table", "crepe", "easy-ext", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 4511e9f61d89..a7f37bf50591 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -28,6 +28,7 @@ clap = { workspace = true } comfy-table = "7" crepe = "0.1" easy-ext = "1" +educe = "0.6" either = "1" enum-as-inner = "0.6" etcd-client = { workspace = true } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0bea5f37940d..ffdf8d3bad8b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -146,8 +146,10 @@ impl ReplaceTablePlan { } } -#[derive(Debug, Clone)] +#[derive(educe::Educe, Clone)] +#[educe(Debug)] pub struct CreateStreamingJobCommandInfo { + #[educe(Debug(ignore))] pub table_fragments: TableFragments, /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". pub upstream_root_actors: HashMap>, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 5754e4c60e36..e48fb924512a 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -122,6 +122,12 @@ impl Progress { /// Returns whether all backfill executors are done. fn is_done(&self) -> bool { + tracing::trace!( + "Progress::is_done? {}, {}, {:?}", + self.done_count, + self.states.len(), + self.states + ); self.done_count == self.states.len() } @@ -274,6 +280,7 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. #[derive(Default, Debug)] pub(super) struct CreateMviewProgressTracker { + // XXX: which one should contain source backfill? /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, @@ -494,6 +501,7 @@ impl CreateMviewProgressTracker { replace_table: Option<&ReplaceTablePlan>, version_stats: &HummockVersionStats, ) -> Option { + tracing::trace!(?info, "add job to track"); let (info, actors, replace_table_info) = { let CreateStreamingJobCommandInfo { table_fragments, .. @@ -596,6 +604,7 @@ impl CreateMviewProgressTracker { progress: &CreateMviewProgress, version_stats: &HummockVersionStats, ) -> Option { + tracing::trace!(?progress, "update progress"); let actor = progress.backfill_actor_id; let Some(table_id) = self.actor_map.get(&actor).copied() else { // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 52fc811787d3..935d4773865e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -917,6 +917,7 @@ impl MetadataManager { &self, job: &StreamingJob, ) -> MetaResult { + tracing::debug!("wait_streaming_job_finished: {job:?}"); match self { MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await, MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index bec6b95cfb0f..38d704ab0f6d 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -362,7 +362,9 @@ impl TableFragments { return vec![]; } if (fragment.fragment_type_mask - & (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32)) + & (FragmentTypeFlag::Values as u32 + | FragmentTypeFlag::StreamScan as u32 + | FragmentTypeFlag::SourceScan as u32)) != 0 { actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id)); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 6182dd7b14da..d85fc5a922dc 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -40,6 +40,7 @@ use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; use crate::executor::{AddMutation, UpdateMutation}; +use crate::task::CreateMviewProgress; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { @@ -88,6 +89,8 @@ pub struct SourceBackfillExecutorInner { /// Rate limit in rows/s. rate_limit_rps: Option, + + progress: CreateMviewProgress, } /// Local variables used in the backfill stage. @@ -238,6 +241,7 @@ impl SourceBackfillExecutorInner { system_params: SystemParamsReaderRef, backfill_state_store: BackfillStateTableHandler, rate_limit_rps: Option, + progress: CreateMviewProgress, ) -> Self { let source_split_change_count = metrics .source_split_change_count @@ -247,6 +251,7 @@ impl SourceBackfillExecutorInner { &actor_ctx.id.to_string(), &actor_ctx.fragment_id.to_string(), ]); + Self { actor_ctx, info, @@ -256,6 +261,7 @@ impl SourceBackfillExecutorInner { source_split_change_count, system_params, rate_limit_rps, + progress, } } @@ -554,6 +560,13 @@ impl SourceBackfillExecutorInner { ); } + // TODO: use a specialized progress for source? + // self.progress.update( + // barrier.epoch, + // snapshot_read_epoch, + // total_snapshot_processed_rows, + // ); + self.backfill_state_store .set_states(backfill_stage.states.clone()) .await?; @@ -637,6 +650,7 @@ impl SourceBackfillExecutorInner { } } + let mut first_barrier_after_finish = true; let mut splits: HashSet = backfill_stage.states.keys().cloned().collect(); // All splits finished backfilling. Now we only forward the source data. @@ -673,14 +687,22 @@ impl SourceBackfillExecutorInner { _ => {} } } - self.backfill_state_store - .set_states( - splits - .iter() - .map(|s| (s.clone(), BackfillState::Finished)) - .collect(), - ) - .await?; + + if first_barrier_after_finish { + // Update state and report progress after the first barrier. + // TODO: use a specialized progress for source + tracing::error!("progress finish"); + self.progress.finish(barrier.epoch, 114514); + self.backfill_state_store + .set_states( + splits + .iter() + .map(|s| (s.clone(), BackfillState::Finished)) + .collect(), + ) + .await?; + } + first_barrier_after_finish = false; self.backfill_state_store .state_store .commit(barrier.epoch) diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index ba3ab599af70..65329a26bd40 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { source_desc_builder, state_table_handler, ); + let progress = params + .local_barrier_manager + .register_create_mview_progress(params.actor_context.id); let exec = SourceBackfillExecutorInner::new( params.actor_context.clone(), @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { params.env.system_params_manager_ref().get_params(), backfill_state_table, node.rate_limit, + progress, ); let [input]: [_; 1] = params.input.try_into().unwrap(); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 5ccde5004801..6f21e32adc10 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState { prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. + /// + /// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta + /// in [`BarrierCompleteResult`]. pub(super) create_mview_progress: HashMap>, pub(super) state_store: StateStoreImpl,