Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 17, 2024
1 parent 8d2cde2 commit 80df807
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 119 deletions.
130 changes: 42 additions & 88 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(super) struct CreatingStreamingJobControl {
pub(super) snapshot_backfill_info: SnapshotBackfillInfo,
backfill_epoch: u64,

graph_info: InflightGraphInfo,

barrier_control: CreatingStreamingJobBarrierControl,
status: CreatingStreamingJobStatus,

Expand Down Expand Up @@ -87,13 +89,13 @@ impl CreatingStreamingJobControl {
metrics,
),
backfill_epoch,
graph_info: InflightGraphInfo::new(fragment_info),
status: CreatingStreamingJobStatus::ConsumingSnapshot {
prev_epoch_fake_physical_time: 0,
pending_commands: vec![],
pending_upstream_barriers: vec![],
version_stats: version_stat.clone(),
create_mview_tracker,
snapshot_backfill_actors,
graph_info: InflightGraphInfo::new(fragment_info),
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
initial_barrier_info: Some((actors_to_create, initial_mutation)),
Expand All @@ -106,17 +108,11 @@ impl CreatingStreamingJobControl {

pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
self.barrier_control.is_wait_on_worker(worker_id)
|| self
.status
.active_graph_info()
.map(|info| info.contains_worker(worker_id))
.unwrap_or(false)
|| (self.status.is_finishing() && self.graph_info.contains_worker(worker_id))
}

pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
if let Some(info) = self.status.active_graph_info() {
info.on_new_worker_node_map(node_map)
}
self.graph_info.on_new_worker_node_map(node_map)
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
Expand Down Expand Up @@ -159,14 +155,15 @@ impl CreatingStreamingJobControl {
}

pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
if matches!(&self.status, CreatingStreamingJobStatus::Finishing(_)) {
return None;
if self.status.is_finishing() {
None
} else {
// TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
Some(max(
self.barrier_control.max_collected_epoch().unwrap_or(0),
self.backfill_epoch,
))
}
// TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
Some(max(
self.barrier_control.max_collected_epoch().unwrap_or(0),
self.backfill_epoch,
))
}

fn inject_barrier(
Expand Down Expand Up @@ -212,6 +209,13 @@ impl CreatingStreamingJobControl {
} else {
false
};
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
prev_epoch = command_ctx.prev_epoch.value().0,
"start consuming upstream"
);
}
let progress_epoch =
if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
max(max_collected_epoch, self.backfill_epoch)
Expand All @@ -225,71 +229,23 @@ impl CreatingStreamingJobControl {
.0
.saturating_sub(progress_epoch) as _,
);
match &mut self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
pending_commands,
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
ref graph_info,
..
} => {
assert!(
!start_consume_upstream,
"should not start consuming upstream for a job that are consuming snapshot"
);
let new_barrier = CreatingStreamingJobStatus::new_fake_barrier(
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
command_ctx.kind.is_checkpoint(),
);
pending_commands.push(command_ctx.clone());
Self::inject_barrier(
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
graph_info,
Some(graph_info),
new_barrier,
)?;
}
CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => {
Self::inject_barrier(
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
graph_info,
if start_consume_upstream {
None
} else {
Some(graph_info)
},
CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
},
)?;
let prev_epoch = command_ctx.prev_epoch.value().0;
if let Some(barrier_to_inject) = self
.status
.on_new_upstream_epoch(command_ctx, start_consume_upstream)
{
Self::inject_barrier(
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
&self.graph_info,
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
prev_epoch, "start consuming upstream"
);
assert!(command_ctx.kind.is_checkpoint());
self.status = CreatingStreamingJobStatus::Finishing(prev_epoch);
}
}
CreatingStreamingJobStatus::Finishing { .. } => {
assert!(
!start_consume_upstream,
"should not start consuming upstream for a job again"
);
}
};
None
} else {
Some(&self.graph_info)
},
barrier_to_inject,
)?;
}
Ok(())
}

Expand All @@ -302,15 +258,15 @@ impl CreatingStreamingJobControl {
) -> MetaResult<()> {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some((prev_barriers_to_inject, graph_info)) = prev_barriers_to_inject {
if let Some(prev_barriers_to_inject) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
table_id,
control_stream_manager,
&mut self.barrier_control,
graph_info,
Some(graph_info),
&self.graph_info,
Some(&self.graph_info),
info,
)?;
}
Expand All @@ -320,12 +276,11 @@ impl CreatingStreamingJobControl {

pub(super) fn should_merge_to_upstream(&self) -> Option<InflightGraphInfo> {
if let CreatingStreamingJobStatus::ConsumingLogStore {
graph_info,
ref log_store_progress_tracker,
} = &self.status
&& log_store_progress_tracker.is_finished()
{
Some(graph_info.clone())
Some(self.graph_info.clone())
} else {
None
}
Expand Down Expand Up @@ -392,7 +347,6 @@ impl CreatingStreamingJobControl {
}

pub(super) fn is_finished(&self) -> bool {
self.barrier_control.is_empty()
&& matches!(&self.status, CreatingStreamingJobStatus::Finishing { .. })
self.barrier_control.is_empty() && self.status.is_finishing()
}
}
105 changes: 74 additions & 31 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::mem::take;
use std::sync::Arc;

use risingwave_common::hash::ActorId;
use risingwave_common::must_match;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model_v2::WorkerId;
use risingwave_pb::hummock::HummockVersionStats;
Expand All @@ -30,7 +29,6 @@ use risingwave_pb::stream_service::barrier_complete_response::{
use tracing::warn;

use crate::barrier::command::CommandContext;
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierKind, TracedEpoch};

Expand Down Expand Up @@ -100,12 +98,14 @@ impl CreateMviewLogStoreProgressTracker {

#[derive(Debug)]
pub(super) enum CreatingStreamingJobStatus {
/// The creating job is consuming upstream snapshot.
/// Will transit to `ConsumingLogStore` on `update_progress` when
/// the snapshot has been fully consumed after `update_progress`.
ConsumingSnapshot {
prev_epoch_fake_physical_time: u64,
pending_commands: Vec<Arc<CommandContext>>,
pending_upstream_barriers: Vec<(TracedEpoch, TracedEpoch, BarrierKind)>,
version_stats: HummockVersionStats,
create_mview_tracker: CreateMviewProgressTracker,
graph_info: InflightGraphInfo,
snapshot_backfill_actors: HashSet<ActorId>,
backfill_epoch: u64,
/// The `prev_epoch` of pending non checkpoint barriers
Expand All @@ -114,8 +114,10 @@ pub(super) enum CreatingStreamingJobStatus {
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
},
/// The creating job is consuming log store.
///
/// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`.
ConsumingLogStore {
graph_info: InflightGraphInfo,
log_store_progress_tracker: CreateMviewLogStoreProgressTracker,
},
/// All backfill actors have started consuming upstream, and the job
Expand All @@ -133,29 +135,16 @@ pub(super) struct CreatingJobInjectBarrierInfo {
}

impl CreatingStreamingJobStatus {
pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> {
match self {
CreatingStreamingJobStatus::ConsumingSnapshot { graph_info, .. }
| CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => Some(graph_info),
CreatingStreamingJobStatus::Finishing(_) => {
// when entering `Finishing`, the graph will have been added to the upstream graph,
// and therefore the separate graph info is inactive.
None
}
}
}

pub(super) fn update_progress(
&mut self,
create_mview_progress: impl IntoIterator<Item = &CreateMviewProgress>,
) -> Option<(Vec<CreatingJobInjectBarrierInfo>, &InflightGraphInfo)> {
) -> Option<Vec<CreatingJobInjectBarrierInfo>> {
match self {
Self::ConsumingSnapshot {
create_mview_tracker,
ref version_stats,
prev_epoch_fake_physical_time,
pending_commands,
ref graph_info,
pending_upstream_barriers,
pending_non_checkpoint_barriers,
ref backfill_epoch,
initial_barrier_info,
Expand Down Expand Up @@ -184,27 +173,24 @@ impl CreatingStreamingJobStatus {
mutation,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
.chain(pending_upstream_barriers.drain(..).map(
|(prev_epoch, curr_epoch, kind)| CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: None,
mutation: None,
}
}))
},
))
.collect();

*self = CreatingStreamingJobStatus::ConsumingLogStore {
graph_info: graph_info.clone(),
log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
snapshot_backfill_actors.iter().cloned(),
barriers_to_inject.len(),
),
};
let graph_info = must_match!(self,
CreatingStreamingJobStatus::ConsumingLogStore {graph_info, ..} => graph_info);
Some((barriers_to_inject, graph_info))
Some(barriers_to_inject)
} else {
None
}
Expand All @@ -220,6 +206,59 @@ impl CreatingStreamingJobStatus {
}
}

pub(super) fn on_new_upstream_epoch(
&mut self,
command_ctx: &Arc<CommandContext>,
start_consume_upstream: bool,
) -> Option<CreatingJobInjectBarrierInfo> {
match self {
CreatingStreamingJobStatus::ConsumingSnapshot {
pending_upstream_barriers,
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
..
} => {
assert!(
!start_consume_upstream,
"should not start consuming upstream for a job that are consuming snapshot"
);
pending_upstream_barriers.push((
command_ctx.prev_epoch.clone(),
command_ctx.curr_epoch.clone(),
command_ctx.kind.clone(),
));
Some(CreatingStreamingJobStatus::new_fake_barrier(
prev_epoch_fake_physical_time,
pending_non_checkpoint_barriers,
initial_barrier_info,
command_ctx.kind.is_checkpoint(),
))
}
CreatingStreamingJobStatus::ConsumingLogStore { .. } => {
let prev_epoch = command_ctx.prev_epoch.value().0;
if start_consume_upstream {
assert!(command_ctx.kind.is_checkpoint());
*self = CreatingStreamingJobStatus::Finishing(prev_epoch);
}
Some(CreatingJobInjectBarrierInfo {
curr_epoch: command_ctx.curr_epoch.clone(),
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
})
}
CreatingStreamingJobStatus::Finishing { .. } => {
assert!(
!start_consume_upstream,
"should not start consuming upstream for a job again"
);
None
}
}
}

pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
Expand Down Expand Up @@ -255,4 +294,8 @@ impl CreatingStreamingJobStatus {
}
}
}

pub(super) fn is_finishing(&self) -> bool {
matches!(self, Self::Finishing(_))
}
}

0 comments on commit 80df807

Please sign in to comment.