Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve progress msg in SHOW JOBS for source backfill #18925

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ message InjectBarrierRequest {

message BarrierCompleteResponse {
message CreateMviewProgress {
// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
// See <https://github.com/risingwavelabs/risingwave/issues/6236>.
uint32 backfill_actor_id = 1;
bool done = 2;
// MV backfill snapshot read epoch (0 for Done / Source backfill)
uint64 consumed_epoch = 3;
// MV backfill snapshot read rows / Source backfilled rows
uint64 consumed_rows = 4;
uint32 pending_barrier_num = 5;
}
Expand Down
199 changes: 126 additions & 73 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,37 @@ use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, EpochNode, ReplaceTablePlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::model::{ActorId, TableFragments};
use crate::model::{ActorId, BackfillUpstreamType, TableFragments};
use crate::MetaResult;

type ConsumedRows = u64;

#[derive(Clone, Copy, Debug)]
enum BackfillState {
Init,
ConsumingUpstream(#[allow(dead_code)] Epoch, ConsumedRows),
ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows),
Done(ConsumedRows),
}

/// Progress of all actors containing backfill executors while creating mview.
#[derive(Debug)]
pub(super) struct Progress {
// `states` and `done_count` decides whether the progress is done. See `is_done`.
states: HashMap<ActorId, BackfillState>,

done_count: usize,

/// Tells whether the backfill is from source or mv.
backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,

// The following row counts are used to calculate the progress. See `calculate_progress`.
/// Upstream mv count.
/// Keep track of how many times each upstream MV
/// appears in this stream job.
upstream_mv_count: HashMap<TableId, usize>,

/// Total key count in the upstream materialized view
/// TODO: implement this for source backfill
upstream_total_key_count: u64,

/// Consumed rows
consumed_rows: u64,
/// Total key count of all the upstream materialized views
upstream_mvs_total_key_count: u64,
mv_backfill_consumed_rows: u64,
source_backfill_consumed_rows: u64,

/// DDL definition
definition: String,
Expand All @@ -66,47 +67,55 @@ pub(super) struct Progress {
impl Progress {
/// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors.
fn new(
actors: impl IntoIterator<Item = ActorId>,
actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
upstream_mv_count: HashMap<TableId, usize>,
upstream_total_key_count: u64,
definition: String,
) -> Self {
let states = actors
.into_iter()
.map(|a| (a, BackfillState::Init))
.collect::<HashMap<_, _>>();
let mut states = HashMap::new();
let mut backfill_upstream_types = HashMap::new();
for (actor, backfill_upstream_type) in actors {
states.insert(actor, BackfillState::Init);
backfill_upstream_types.insert(actor, backfill_upstream_type);
}
assert!(!states.is_empty());

Self {
states,
backfill_upstream_types,
done_count: 0,
upstream_mv_count,
upstream_total_key_count,
consumed_rows: 0,
upstream_mvs_total_key_count: upstream_total_key_count,
mv_backfill_consumed_rows: 0,
source_backfill_consumed_rows: 0,
definition,
}
}

/// Update the progress of `actor`.
fn update(&mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64) {
self.upstream_total_key_count = upstream_total_key_count;
self.upstream_mvs_total_key_count = upstream_total_key_count;
let total_actors = self.states.len();
let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
tracing::debug!(?actor, states = ?self.states, "update progress for actor");

let mut old = 0;
let mut new = 0;
match self.states.remove(&actor).unwrap() {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, old_consumed_rows) => {
self.consumed_rows -= old_consumed_rows;
old = old_consumed_rows;
}
BackfillState::Done(_) => panic!("should not report done multiple times"),
};
match &new_state {
BackfillState::Init => {}
BackfillState::ConsumingUpstream(_, new_consumed_rows) => {
self.consumed_rows += new_consumed_rows;
new = *new_consumed_rows;
}
BackfillState::Done(new_consumed_rows) => {
tracing::debug!("actor {} done", actor);
self.consumed_rows += new_consumed_rows;
new = *new_consumed_rows;
self.done_count += 1;
tracing::debug!(
"{} actors out of {} complete",
Expand All @@ -115,8 +124,19 @@ impl Progress {
);
}
};
debug_assert!(new >= old, "backfill progress should not go backward");
match backfill_upstream_type {
BackfillUpstreamType::MView => {
self.mv_backfill_consumed_rows += new - old;
}
BackfillUpstreamType::Source => {
self.source_backfill_consumed_rows += new - old;
}
BackfillUpstreamType::Values => {
// do not consider progress for values
}
}
self.states.insert(actor, new_state);
self.calculate_progress();
}

/// Returns whether all backfill executors are done.
Expand All @@ -137,19 +157,52 @@ impl Progress {
}

/// `progress` = `consumed_rows` / `upstream_total_key_count`
fn calculate_progress(&self) -> f64 {
fn calculate_progress(&self) -> String {
if self.is_done() || self.states.is_empty() {
return 1.0;
return "100%".to_string();
}
let mut upstream_total_key_count = self.upstream_total_key_count as f64;
if upstream_total_key_count == 0.0 {
upstream_total_key_count = 1.0
let mut mv_count = 0;
let mut source_count = 0;
for backfill_upstream_type in self.backfill_upstream_types.values() {
match backfill_upstream_type {
BackfillUpstreamType::MView => mv_count += 1,
BackfillUpstreamType::Source => source_count += 1,
BackfillUpstreamType::Values => (),
}
}
let mut progress = self.consumed_rows as f64 / upstream_total_key_count;
if progress >= 1.0 {
progress = 0.99;

let mv_progress = (mv_count > 0).then_some({
if self.upstream_mvs_total_key_count == 0 {
"99.99%".to_string()
} else {
let mut progress = self.mv_backfill_consumed_rows as f64
/ (self.upstream_mvs_total_key_count as f64);
if progress > 1.0 {
progress = 0.9999;
}
format!(
"{:.2}% ({}/{})",
progress * 100.0,
self.mv_backfill_consumed_rows,
self.upstream_mvs_total_key_count
)
}
});
let source_progress = (source_count > 0).then_some(format!(
"{} rows consumed",
self.source_backfill_consumed_rows
));
match (mv_progress, source_progress) {
(Some(mv_progress), Some(source_progress)) => {
format!(
"MView Backfill: {}, Source Backfill: {}",
mv_progress, source_progress
)
}
(Some(mv_progress), None) => mv_progress,
(None, Some(source_progress)) => source_progress,
(None, None) => "Unknown".to_string(),
Comment on lines +174 to +204
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

core logic here

}
progress
}
}

Expand Down Expand Up @@ -231,11 +284,9 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// TODO: add a specialized progress for source
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

/// Find the epoch of the create-mview DDL by the actor containing the MV/source backfill executors.
actor_map: HashMap<ActorId, TableId>,

/// Stash of finished jobs. They will be finally finished on checkpoint.
Expand All @@ -259,14 +310,17 @@ impl CreateMviewProgressTracker {
let mut progress_map = HashMap::new();
for (creating_table_id, (definition, table_fragments)) in mview_map {
let mut states = HashMap::new();
let actors = table_fragments.backfill_actor_ids();
for actor in actors {
let mut backfill_upstream_types = HashMap::new();
let actors = table_fragments.tracking_progress_actor_ids();
for (actor, backfill_upstream_type) in actors {
actor_map.insert(actor, creating_table_id);
states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0));
backfill_upstream_types.insert(actor, backfill_upstream_type);
}

let progress = Self::recover_progress(
states,
backfill_upstream_types,
table_fragments.dependent_table_ids(),
definition,
&version_stats,
Expand All @@ -284,28 +338,28 @@ impl CreateMviewProgressTracker {
}
}

/// ## How recovery works
///
/// The progress (number of rows consumed) is persisted in state tables.
/// During recovery, the backfill executor will restore the number of rows consumed,
/// and then it will just report progress like newly created executors.
fn recover_progress(
states: HashMap<ActorId, BackfillState>,
backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
upstream_mv_count: HashMap<TableId, usize>,
definition: String,
version_stats: &HummockVersionStats,
) -> Progress {
let upstream_total_key_count = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_mvs_total_key_count =
calculate_total_key_count(&upstream_mv_count, version_stats);
Progress {
states,
backfill_upstream_types,
done_count: 0, // Fill only after first barrier pass
upstream_mv_count,
upstream_total_key_count,
consumed_rows: 0, // Fill only after first barrier pass
upstream_mvs_total_key_count,
mv_backfill_consumed_rows: 0, // Fill only after first barrier pass
source_backfill_consumed_rows: 0, // Fill only after first barrier pass
definition,
}
}
Expand All @@ -318,7 +372,7 @@ impl CreateMviewProgressTracker {
let ddl_progress = DdlProgress {
id: table_id as u64,
statement: x.definition.clone(),
progress: format!("{:.2}%", x.calculate_progress() * 100.0),
progress: x.calculate_progress(),
};
(table_id, ddl_progress)
})
Expand Down Expand Up @@ -377,7 +431,7 @@ impl CreateMviewProgressTracker {

/// Finish stashed jobs on checkpoint.
pub(super) fn take_finished_jobs(&mut self) -> Vec<TrackingJob> {
tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "finishing jobs");
tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs");
take(&mut self.pending_finished_jobs)
}

Expand Down Expand Up @@ -449,16 +503,8 @@ impl CreateMviewProgressTracker {
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_total_key_count: u64 =
calculate_total_key_count(&upstream_mv_count, version_stats);
(
upstream_mv_count,
upstream_total_key_count,
Expand All @@ -467,8 +513,8 @@ impl CreateMviewProgressTracker {
)
};

for &actor in &actors {
self.actor_map.insert(actor, creating_mv_id);
for (actor, _backfill_upstream_type) in &actors {
self.actor_map.insert(*actor, creating_mv_id);
}

let progress = Progress::new(
Expand Down Expand Up @@ -537,18 +583,8 @@ impl CreateMviewProgressTracker {
Entry::Occupied(mut o) => {
let progress = &mut o.get_mut().0;

let upstream_total_key_count: u64 = progress
.upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
assert_ne!(*count, 0);
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
let upstream_total_key_count: u64 =
calculate_total_key_count(&progress.upstream_mv_count, version_stats);

tracing::debug!(?table_id, "updating progress for table");
progress.update(actor, new_state, upstream_total_key_count);
Expand Down Expand Up @@ -577,3 +613,20 @@ impl CreateMviewProgressTracker {
}
}
}

fn calculate_total_key_count(
table_count: &HashMap<TableId, usize>,
version_stats: &HummockVersionStats,
) -> u64 {
table_count
.iter()
.map(|(table_id, count)| {
assert_ne!(*count, 0);
*count as u64
* version_stats
.table_stats
.get(&table_id.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum()
}
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl GlobalBarrierManagerContext {
Ok(())
}

// FIXME: didn't consider Values here
async fn recover_background_mv_progress(&self) -> MetaResult<CreateMviewProgressTracker> {
let mgr = &self.metadata_manager;
let mviews = mgr
Expand Down
Loading
Loading