diff --git a/Cargo.lock b/Cargo.lock index 4e648e08a3fe..99423896f89f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11213,6 +11213,7 @@ dependencies = [ "comfy-table", "crepe", "easy-ext", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 4511e9f61d89..a7f37bf50591 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -28,6 +28,7 @@ clap = { workspace = true } comfy-table = "7" crepe = "0.1" easy-ext = "1" +educe = "0.6" either = "1" enum-as-inner = "0.6" etcd-client = { workspace = true } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6e4ebe40b93b..a1c82ccd8db8 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -147,8 +147,10 @@ impl ReplaceTablePlan { } } -#[derive(Debug, Clone)] +#[derive(educe::Educe, Clone)] +#[educe(Debug)] pub struct CreateStreamingJobCommandInfo { + #[educe(Debug(ignore))] pub table_fragments: TableFragments, /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". pub upstream_root_actors: HashMap>, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 5754e4c60e36..2e1b6f9dc397 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -55,6 +55,7 @@ pub(super) struct Progress { upstream_mv_count: HashMap, /// Total key count in the upstream materialized view + /// TODO: implement this for source backfill upstream_total_key_count: u64, /// Consumed rows @@ -122,6 +123,12 @@ impl Progress { /// Returns whether all backfill executors are done. fn is_done(&self) -> bool { + tracing::trace!( + "Progress::is_done? {}, {}, {:?}", + self.done_count, + self.states.len(), + self.states + ); self.done_count == self.states.len() } @@ -274,6 +281,7 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. #[derive(Default, Debug)] pub(super) struct CreateMviewProgressTracker { + // TODO: add a specialized progress for source /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, @@ -494,6 +502,7 @@ impl CreateMviewProgressTracker { replace_table: Option<&ReplaceTablePlan>, version_stats: &HummockVersionStats, ) -> Option { + tracing::trace!(?info, "add job to track"); let (info, actors, replace_table_info) = { let CreateStreamingJobCommandInfo { table_fragments, .. @@ -596,6 +605,7 @@ impl CreateMviewProgressTracker { progress: &CreateMviewProgress, version_stats: &HummockVersionStats, ) -> Option { + tracing::trace!(?progress, "update progress"); let actor = progress.backfill_actor_id; let Some(table_id) = self.actor_map.get(&actor).copied() else { // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 52fc811787d3..935d4773865e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -917,6 +917,7 @@ impl MetadataManager { &self, job: &StreamingJob, ) -> MetaResult { + tracing::debug!("wait_streaming_job_finished: {job:?}"); match self { MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await, MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 447cf5cf8564..aaff07668878 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -363,7 +363,9 @@ impl TableFragments { return vec![]; } if (fragment.fragment_type_mask - & (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32)) + & (FragmentTypeFlag::Values as u32 + | FragmentTypeFlag::StreamScan as u32 + | FragmentTypeFlag::SourceScan as u32)) != 0 { actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id)); diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d10fa83710d8..8e9b5be69043 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -441,6 +441,8 @@ pub struct ScaleController { pub env: MetaSrvEnv, + /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state. + /// e.g., a MV cannot be rescheduled during foreground backfill. pub reschedule_lock: RwLock<()>, } diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index b28c707bdedd..09a4d0a40f1c 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; +use std::sync::Once; use std::time::Instant; use anyhow::anyhow; @@ -30,6 +31,7 @@ use risingwave_connector::source::{ BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, }; +use risingwave_hummock_sdk::HummockReadEpoch; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -40,6 +42,7 @@ use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; use crate::executor::UpdateMutation; +use crate::task::CreateMviewProgress; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { @@ -88,6 +91,8 @@ pub struct SourceBackfillExecutorInner { /// Rate limit in rows/s. rate_limit_rps: Option, + + progress: CreateMviewProgress, } /// Local variables used in the backfill stage. @@ -230,6 +235,7 @@ impl BackfillStage { } impl SourceBackfillExecutorInner { + #[expect(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, info: ExecutorInfo, @@ -238,6 +244,7 @@ impl SourceBackfillExecutorInner { system_params: SystemParamsReaderRef, backfill_state_store: BackfillStateTableHandler, rate_limit_rps: Option, + progress: CreateMviewProgress, ) -> Self { let source_split_change_count = metrics .source_split_change_count @@ -247,6 +254,7 @@ impl SourceBackfillExecutorInner { &actor_ctx.id.to_string(), &actor_ctx.fragment_id.to_string(), ]); + Self { actor_ctx, info, @@ -256,6 +264,7 @@ impl SourceBackfillExecutorInner { source_split_change_count, system_params, rate_limit_rps, + progress, } } @@ -346,7 +355,6 @@ impl SourceBackfillExecutorInner { splits: owned_splits, }; backfill_stage.debug_assert_consistent(); - tracing::debug!(?backfill_stage, "source backfill started"); // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; @@ -370,6 +378,7 @@ impl SourceBackfillExecutorInner { } } } + tracing::debug!(?backfill_stage, "source backfill started"); fn select_strategy(_: &mut ()) -> PollNext { futures::stream::PollNext::Left @@ -407,9 +416,23 @@ impl SourceBackfillExecutorInner { pause_reader!(); } + let state_store = self.backfill_state_store.state_store.state_store().clone(); + static STATE_TABLE_INITIALIZED: Once = Once::new(); + tokio::spawn(async move { + // This is for self.backfill_finished() to be safe. + // We wait for 1st epoch's curr, i.e., the 2nd epoch's prev. + let epoch = barrier.epoch.curr; + tracing::info!("waiting for epoch: {}", epoch); + state_store + .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .await + .expect("failed to wait epoch"); + STATE_TABLE_INITIALIZED.call_once(|| ()); + tracing::info!("finished waiting for epoch: {}", epoch); + }); yield Message::Barrier(barrier); - if !self.backfill_finished(&backfill_stage.states).await? { + { let source_backfill_row_count = self .metrics .source_backfill_row_count @@ -552,10 +575,26 @@ impl SourceBackfillExecutorInner { .commit(barrier.epoch) .await?; - yield Message::Barrier(barrier); - - if self.backfill_finished(&backfill_stage.states).await? { - break 'backfill_loop; + if self.should_report_finished(&backfill_stage.states) { + // TODO: use a specialized progress for source + // Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates + // progress based on the number of consumed rows and an estimated total number of rows from hummock. + // For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%. + tracing::info!("progress finish"); + let epoch = barrier.epoch; + self.progress.finish(epoch, 114514); + // yield barrier after reporting progress + yield Message::Barrier(barrier); + + // After we reported finished, we still don't exit the loop. + // Because we need to handle split migration. + if STATE_TABLE_INITIALIZED.is_completed() + && self.backfill_finished(&backfill_stage.states).await? + { + break 'backfill_loop; + } + } else { + yield Message::Barrier(barrier); } } Message::Chunk(chunk) => { @@ -665,7 +704,7 @@ impl SourceBackfillExecutorInner { self.apply_split_change_forward_stage( actor_splits, &mut splits, - true, + false, ) .await?; } @@ -688,11 +727,34 @@ impl SourceBackfillExecutorInner { } } - /// All splits finished backfilling. + /// When we should call `progress.finish()` to let blocking DDL return. + /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come. + /// + /// Note: split migration (online scaling) is related with progress tracking. + /// - For foreground DDL, scaling is not allowed before progress is finished. + /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery. + /// + /// See for more details. + fn should_report_finished(&self, states: &BackfillStates) -> bool { + states.values().all(|state| { + matches!( + state, + BackfillState::Finished | BackfillState::SourceCachingUp(_) + ) + }) + } + + /// All splits entered `Finished` state. /// /// We check all splits for the source, including other actors' splits here, before going to the forward stage. - /// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to + /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to /// this actor, we still need to backfill it. + /// + /// Note: at the beginning, the actor will only read the state written by itself. + /// It needs to _wait until it can read all actors' written data_. + /// i.e., wait for the first checkpoint has been available. + /// + /// See for more details. async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult { Ok(states .values() @@ -761,7 +823,6 @@ impl SourceBackfillExecutorInner { } Some(backfill_state) => { // Migrated split. Backfill if unfinished. - // TODO: disallow online scaling during backfilling. target_state.insert(split_id, backfill_state); } } diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index be9abe8490e6..3579aff2ec4f 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -76,6 +76,7 @@ impl BackfillStateTableHandler { }; ret.push(state); } + tracing::trace!("scan SourceBackfill state table: {:?}", ret); Ok(ret) } diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index ba3ab599af70..65329a26bd40 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { source_desc_builder, state_table_handler, ); + let progress = params + .local_barrier_manager + .register_create_mview_progress(params.actor_context.id); let exec = SourceBackfillExecutorInner::new( params.actor_context.clone(), @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { params.env.system_params_manager_ref().get_params(), backfill_state_table, node.rate_limit, + progress, ); let [input]: [_; 1] = params.input.try_into().unwrap(); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 5ccde5004801..6f21e32adc10 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState { prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. + /// + /// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta + /// in [`BarrierCompleteResult`]. pub(super) create_mview_progress: HashMap>, pub(super) state_store: StateStoreImpl,