Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
.
  • Loading branch information
xxchan committed Oct 21, 2024
1 parent b7f1b41 commit 80ff0b1
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 84 deletions.
4 changes: 4 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ message InjectBarrierRequest {

message BarrierCompleteResponse {
message CreateMviewProgress {
// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
// See <https://github.com/risingwavelabs/risingwave/issues/6236>.
uint32 backfill_actor_id = 1;
bool done = 2;
// MV backfill snapshot read epoch (0 for Done / Source backfill)
uint64 consumed_epoch = 3;
// MV backfill snapshot read rows / Source backfilled rows
uint64 consumed_rows = 4;
uint32 pending_barrier_num = 5;
}
Expand Down
199 changes: 126 additions & 73 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,37 @@ use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, EpochNode, ReplaceTablePlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::model::{ActorId, TableFragments};
use crate::model::{ActorId, BackfillUpstreamType, TableFragments};
use crate::MetaResult;

type ConsumedRows = u64;

#[derive(Clone, Copy, Debug)]
enum BackfillState {
Init,
ConsumingUpstream(#[allow(dead_code)] Epoch, ConsumedRows),
ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
Done(ConsumedRows),
}

/// Progress of all actors containing backfill executors while creating mview.
#[derive(Debug)]
pub(super) struct Progress {
// `states` and `done_count` decides whether the progress is done. See `is_done`.
states: HashMap<ActorId, BackfillState>,

done_count: usize,

/// Tells whether the backfill is from source or mv.
backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,

// The following row counts are used to calculate the progress. See `calculate_progress`.
/// Upstream mv count.
/// Keep track of how many times each upstream MV
/// appears in this stream job.
upstream_mv_count: HashMap<TableId, usize>,

/// Total key count in the upstream materialized view
/// TODO: implement this for source backfill
upstream_total_key_count: u64,

/// Consumed rows
consumed_rows: u64,
/// Total key count of all the upstream materialized views
upstream_mvs_total_key_count: u64,
mv_backfill_consumed_rows: u64,
source_backfill_consumed_rows: u64,

/// DDL definition
definition: String,
Expand All @@ -66,47 +67,55 @@ pub(super) struct Progress {
impl Progress {
/// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
fn new(
actors: impl IntoIterator<Item = ActorId>,
actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
upstream_mv_count: HashMap<TableId, usize>,
upstream_total_key_count: u64,
definition: String,
) -> Self {
let states = actors
.into_iter()
.map(|a| (a, BackfillState::Init))
.collect::<HashMap<_, _>>();
let mut states = HashMap::new();
let mut backfill_upstream_types = HashMap::new();
for (actor, backfill_upstream_type) in actors {
states.insert(actor, BackfillState::Init);
backfill_upstream_types.insert(actor, backfill_upstream_type);
}
assert!(!states.is_empty());

Self {
states,
backfill_upstream_types,
done_count: 0,
upstream_mv_count,
upstream_total_key_count,
consumed_rows: 0,
upstream_mvs_total_key_count: upstream_total_key_count,
mv_backfill_consumed_rows: 0,
source_backfill_consumed_rows: 0,
definition,
}
}

/// Update the progress of `actor`.
fn update(&mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64) {
self.upstream_total_key_count = upstream_total_key_count;
self.upstream_mvs_total_key_count = upstream_total_key_count;
let total_actors = self.states.len();
let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
tracing::debug!(?actor, states = ?self.states, "update progress for actor");

let mut old = 0;
let mut new = 0;
match self.states.remove(&actor).unwrap() {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
self.consumed_rows -= old_consumed_rows;
old = old_consumed_rows;
}
BackfillState::Done(_) => panic!("should not report done multiple times"),
};
match &new_state {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
self.consumed_rows += new_consumed_rows;
new = *new_consumed_rows;
}
BackfillState::Done(new_consumed_rows) => {
tracing::debug!("actor {} done", actor);
self.consumed_rows += new_consumed_rows;
new = *new_consumed_rows;
self.done_count += 1;
tracing::debug!(
"{} actors out of {} complete",
Expand All @@ -115,8 +124,19 @@ impl Progress {
);
}
};
debug_assert!(new >= old, "backfill progress should not go backward");
match backfill_upstream_type {
BackfillUpstreamType::MView => {
self.mv_backfill_consumed_rows += new - old;
}
BackfillUpstreamType::Source => {
self.source_backfill_consumed_rows += new - old;
}
BackfillUpstreamType::Values => {
// do not consider progress for values
}
}
self.states.insert(actor, new_state);
self.calculate_progress();
}

/// Returns whether all backfill executors are done.
Expand All @@ -137,19 +157,52 @@ impl Progress {
}

/// `progress` = `consumed_rows` / `upstream_total_key_count`
fn calculate_progress(&self) -> f64 {
fn calculate_progress(&self) -> String {
if self.is_done() || self.states.is_empty() {
return 1.0;
return "100%".to_string();
}
let mut upstream_total_key_count = self.upstream_total_key_count as f64;
if upstream_total_key_count == 0.0 {
upstream_total_key_count = 1.0
let mut mv_count = 0;
let mut source_count = 0;
for backfill_upstream_type in self.backfill_upstream_types.values() {
match backfill_upstream_type {
BackfillUpstreamType::MView => mv_count += 1,
BackfillUpstreamType::Source => source_count += 1,
BackfillUpstreamType::Values => (),
}
}
let mut progress = self.consumed_rows as f64 / upstream_total_key_count;
if progress >= 1.0 {
progress = 0.99;

let mv_progress = (mv_count > 0).then_some({
if self.upstream_mvs_total_key_count == 0 {
"99.99%".to_string()
} else {
let mut progress = self.mv_backfill_consumed_rows as f64
/ (self.upstream_mvs_total_key_count as f64);
if progress > 1.0 {
progress = 0.9999;
}
format!(
"{:.2}% ({}/{})",
progress * 100.0,
self.mv_backfill_consumed_rows,
self.upstream_mvs_total_key_count
)
}
});
let source_progress = (source_count > 0).then_some(format!(
"{} rows consumed",
self.source_backfill_consumed_rows
));
match (mv_progress, source_progress) {
(Some(mv_progress), Some(source_progress)) => {
format!(
"MView Backfill: {}, Source Backfill: {}",
mv_progress, source_progress
)
}
(Some(mv_progress), None) => mv_progress,
(None, Some(source_progress)) => source_progress,
(None, None) => "Unknown".to_string(),
}
progress
}
}

Expand Down Expand Up @@ -231,11 +284,9 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// TODO: add a specialized progress for source
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

/// Find the epoch of the create-mview DDL by the actor containing the MV/source backfill executors.
actor_map: HashMap<ActorId, TableId>,

/// Stash of finished jobs. They will be finally finished on checkpoint.
Expand All @@ -259,14 +310,17 @@ impl CreateMviewProgressTracker {
let mut progress_map = HashMap::new();
for (creating_table_id, (definition, table_fragments)) in mview_map {
let mut states = HashMap::new();
let actors = table_fragments.backfill_actor_ids();
for actor in actors {
let mut backfill_upstream_types = HashMap::new();
let actors = table_fragments.tracking_progress_actor_ids();
for (actor, backfill_upstream_type) in actors {
actor_map.insert(actor, creating_table_id);
states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
backfill_upstream_types.insert(actor, backfill_upstream_type);
}

let progress = Self::recover_progress(
states,
backfill_upstream_types,
table_fragments.dependent_table_ids(),
definition,
&version_stats,
Expand All @@ -284,28 +338,28 @@ impl CreateMviewProgressTracker {
}
}

/// ## How recovery works
///
/// The progress (number of rows consumed) is persisted in state tables.
/// During recovery, the backfill executor will restore the number of rows consumed,
/// and then it will just report progress like newly created executors.
fn recover_progress(
states: HashMap<ActorId, BackfillState>,
backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
upstream_mv_count: HashMap<TableId, usize>,
definition: String,
version_stats: &HummockVersionStats,
) -> Progress {
let upstream_total_key_count = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_mvs_total_key_count =
calculate_total_key_count(&upstream_mv_count, version_stats);
Progress {
states,
backfill_upstream_types,
done_count: 0, // Fill only after first barrier pass
upstream_mv_count,
upstream_total_key_count,
consumed_rows: 0, // Fill only after first barrier pass
upstream_mvs_total_key_count,
mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
source_backfill_consumed_rows: 0, // Fill only after first barrier pass
definition,
}
}
Expand All @@ -318,7 +372,7 @@ impl CreateMviewProgressTracker {
let ddl_progress = DdlProgress {
id: table_id as u64,
statement: x.definition.clone(),
progress: format!("{:.2}%", x.calculate_progress() * 100.0),
progress: x.calculate_progress(),
};
(table_id, ddl_progress)
})
Expand Down Expand Up @@ -377,7 +431,7 @@ impl CreateMviewProgressTracker {

/// Finish stashed jobs on checkpoint.
pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "finishing jobs");
tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
take(&mut self.pending_finished_jobs)
}

Expand Down Expand Up @@ -449,16 +503,8 @@ impl CreateMviewProgressTracker {
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_total_key_count: u64 =
calculate_total_key_count(&upstream_mv_count, version_stats);
(
upstream_mv_count,
upstream_total_key_count,
Expand All @@ -467,8 +513,8 @@ impl CreateMviewProgressTracker {
)
};

for &actor in &actors {
self.actor_map.insert(actor, creating_mv_id);
for (actor, _backfill_upstream_type) in &actors {
self.actor_map.insert(*actor, creating_mv_id);
}

let progress = Progress::new(
Expand Down Expand Up @@ -537,18 +583,8 @@ impl CreateMviewProgressTracker {
Entry::Occupied(mut o) => {
let progress = &mut o.get_mut().0;

let upstream_total_key_count: u64 = progress
.upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
assert_ne!(*count, 0);
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_total_key_count: u64 =
calculate_total_key_count(&progress.upstream_mv_count, version_stats);

tracing::debug!(?table_id, "updating progress for table");
progress.update(actor, new_state, upstream_total_key_count);
Expand Down Expand Up @@ -577,3 +613,20 @@ impl CreateMviewProgressTracker {
}
}
}

fn calculate_total_key_count(
table_count: &HashMap<TableId, usize>,
version_stats: &HummockVersionStats,
) -> u64 {
table_count
.iter()
.map(|(table_id, count)| {
assert_ne!(*count, 0);
*count as u64
* version_stats
.table_stats
.get(&table_id.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum()
}
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl GlobalBarrierManagerContext {
Ok(())
}

// FIXME: didn't consider Values here
async fn recover_background_mv_progress(&self) -> MetaResult<CreateMviewProgressTracker> {
let mgr = &self.metadata_manager;
let mviews = mgr
Expand Down
Loading

0 comments on commit 80ff0b1

Please sign in to comment.