Skip to content

Commit

Permalink
feat: track progress for SourceBackfill (blocking DDL) (#18112)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Sep 6, 2024
1 parent df7f54f commit 6489e9a
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ clap = { workspace = true }
comfy-table = "7"
crepe = "0.1"
easy-ext = "1"
educe = "0.6"
either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ impl ReplaceTablePlan {
}
}

#[derive(Debug, Clone)]
#[derive(educe::Educe, Clone)]
#[educe(Debug)]
pub struct CreateStreamingJobCommandInfo {
#[educe(Debug(ignore))]
pub table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(super) struct Progress {
upstream_mv_count: HashMap<TableId, usize>,

/// Total key count in the upstream materialized view
/// TODO: implement this for source backfill
upstream_total_key_count: u64,

/// Consumed rows
Expand Down Expand Up @@ -122,6 +123,12 @@ impl Progress {

/// Returns whether all backfill executors are done.
fn is_done(&self) -> bool {
tracing::trace!(
"Progress::is_done? {}, {}, {:?}",
self.done_count,
self.states.len(),
self.states
);
self.done_count == self.states.len()
}

Expand Down Expand Up @@ -274,6 +281,7 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// TODO: add a specialized progress for source
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

Expand Down Expand Up @@ -494,6 +502,7 @@ impl CreateMviewProgressTracker {
replace_table: Option<&ReplaceTablePlan>,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?info, "add job to track");
let (info, actors, replace_table_info) = {
let CreateStreamingJobCommandInfo {
table_fragments, ..
Expand Down Expand Up @@ -596,6 +605,7 @@ impl CreateMviewProgressTracker {
progress: &CreateMviewProgress,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?progress, "update progress");
let actor = progress.backfill_actor_id;
let Some(table_id) = self.actor_map.get(&actor).copied() else {
// On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ impl MetadataManager {
&self,
job: &StreamingJob,
) -> MetaResult<NotificationVersion> {
tracing::debug!("wait_streaming_job_finished: {job:?}");
match self {
MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await,
MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await,
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ impl TableFragments {
return vec![];
}
if (fragment.fragment_type_mask
& (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32))
& (FragmentTypeFlag::Values as u32
| FragmentTypeFlag::StreamScan as u32
| FragmentTypeFlag::SourceScan as u32))
!= 0
{
actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id));
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ pub struct ScaleController {

pub env: MetaSrvEnv,

/// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
/// e.g., a MV cannot be rescheduled during foreground backfill.
pub reschedule_lock: RwLock<()>,
}

Expand Down
81 changes: 71 additions & 10 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Once;
use std::time::Instant;

use anyhow::anyhow;
Expand All @@ -30,6 +31,7 @@ use risingwave_connector::source::{
BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand All @@ -40,6 +42,7 @@ use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
Expand Down Expand Up @@ -88,6 +91,8 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {

/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

progress: CreateMviewProgress,
}

/// Local variables used in the backfill stage.
Expand Down Expand Up @@ -230,6 +235,7 @@ impl BackfillStage {
}

impl<S: StateStore> SourceBackfillExecutorInner<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
actor_ctx: ActorContextRef,
info: ExecutorInfo,
Expand All @@ -238,6 +244,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
system_params: SystemParamsReaderRef,
backfill_state_store: BackfillStateTableHandler<S>,
rate_limit_rps: Option<u32>,
progress: CreateMviewProgress,
) -> Self {
let source_split_change_count = metrics
.source_split_change_count
Expand All @@ -247,6 +254,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
&actor_ctx.id.to_string(),
&actor_ctx.fragment_id.to_string(),
]);

Self {
actor_ctx,
info,
Expand All @@ -256,6 +264,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_split_change_count,
system_params,
rate_limit_rps,
progress,
}
}

Expand Down Expand Up @@ -346,7 +355,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
splits: owned_splits,
};
backfill_stage.debug_assert_consistent();
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;
Expand All @@ -370,6 +378,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}
}
tracing::debug!(?backfill_stage, "source backfill started");

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
Expand Down Expand Up @@ -407,9 +416,23 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
pause_reader!();
}

let state_store = self.backfill_state_store.state_store.state_store().clone();
static STATE_TABLE_INITIALIZED: Once = Once::new();
tokio::spawn(async move {
// This is for self.backfill_finished() to be safe.
// We wait for 1st epoch's curr, i.e., the 2nd epoch's prev.
let epoch = barrier.epoch.curr;
tracing::info!("waiting for epoch: {}", epoch);
state_store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.await
.expect("failed to wait epoch");
STATE_TABLE_INITIALIZED.call_once(|| ());
tracing::info!("finished waiting for epoch: {}", epoch);
});
yield Message::Barrier(barrier);

if !self.backfill_finished(&backfill_stage.states).await? {
{
let source_backfill_row_count = self
.metrics
.source_backfill_row_count
Expand Down Expand Up @@ -552,10 +575,26 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.commit(barrier.epoch)
.await?;

yield Message::Barrier(barrier);

if self.backfill_finished(&backfill_stage.states).await? {
break 'backfill_loop;
if self.should_report_finished(&backfill_stage.states) {
// TODO: use a specialized progress for source
// Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates
// progress based on the number of consumed rows and an estimated total number of rows from hummock.
// For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%.
tracing::info!("progress finish");
let epoch = barrier.epoch;
self.progress.finish(epoch, 114514);
// yield barrier after reporting progress
yield Message::Barrier(barrier);

// After we reported finished, we still don't exit the loop.
// Because we need to handle split migration.
if STATE_TABLE_INITIALIZED.is_completed()
&& self.backfill_finished(&backfill_stage.states).await?
{
break 'backfill_loop;
}
} else {
yield Message::Barrier(barrier);
}
}
Message::Chunk(chunk) => {
Expand Down Expand Up @@ -665,7 +704,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.apply_split_change_forward_stage(
actor_splits,
&mut splits,
true,
false,
)
.await?;
}
Expand All @@ -688,11 +727,34 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

/// All splits finished backfilling.
/// When we should call `progress.finish()` to let blocking DDL return.
/// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
///
/// Note: split migration (online scaling) is related with progress tracking.
/// - For foreground DDL, scaling is not allowed before progress is finished.
/// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
fn should_report_finished(&self, states: &BackfillStates) -> bool {
states.values().all(|state| {
matches!(
state,
BackfillState::Finished | BackfillState::SourceCachingUp(_)
)
})
}

/// All splits entered `Finished` state.
///
/// We check all splits for the source, including other actors' splits here, before going to the forward stage.
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
///
/// Note: at the beginning, the actor will only read the state written by itself.
/// It needs to _wait until it can read all actors' written data_.
/// i.e., wait for the first checkpoint has been available.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
Ok(states
.values()
Expand Down Expand Up @@ -761,7 +823,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
// TODO: disallow online scaling during backfilling.
target_state.insert(split_id, backfill_state);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
};
ret.push(state);
}
tracing::trace!("scan SourceBackfill state table: {:?}", ret);
Ok(ret)
}

Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/from_proto/source_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
source_desc_builder,
state_table_handler,
);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);

let exec = SourceBackfillExecutorInner::new(
params.actor_context.clone(),
Expand All @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
params.env.system_params_manager_ref().get_params(),
backfill_state_table,
node.rate_limit,
progress,
);
let [input]: [_; 1] = params.input.try_into().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState {
prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,

/// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
///
/// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta
/// in [`BarrierCompleteResult`].
pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,

pub(super) state_store: StateStoreImpl,
Expand Down

0 comments on commit 6489e9a

Please sign in to comment.