diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 783f74275cde9..0598cd319c590 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -48,6 +48,8 @@ pub(super) struct CreatingStreamingJobControl { pub(super) snapshot_backfill_info: SnapshotBackfillInfo, backfill_epoch: u64, + graph_info: InflightGraphInfo, + barrier_control: CreatingStreamingJobBarrierControl, status: CreatingStreamingJobStatus, @@ -87,13 +89,13 @@ impl CreatingStreamingJobControl { metrics, ), backfill_epoch, + graph_info: InflightGraphInfo::new(fragment_info), status: CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time: 0, - pending_commands: vec![], + pending_upstream_barriers: vec![], version_stats: version_stat.clone(), create_mview_tracker, snapshot_backfill_actors, - graph_info: InflightGraphInfo::new(fragment_info), backfill_epoch, pending_non_checkpoint_barriers: vec![], initial_barrier_info: Some((actors_to_create, initial_mutation)), @@ -106,17 +108,11 @@ impl CreatingStreamingJobControl { pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool { self.barrier_control.is_wait_on_worker(worker_id) - || self - .status - .active_graph_info() - .map(|info| info.contains_worker(worker_id)) - .unwrap_or(false) + || (self.status.is_finishing() && self.graph_info.contains_worker(worker_id)) } pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap) { - if let Some(info) = self.status.active_graph_info() { - info.on_new_worker_node_map(node_map) - } + self.graph_info.on_new_worker_node_map(node_map) } pub(super) fn gen_ddl_progress(&self) -> DdlProgress { @@ -159,14 +155,15 @@ impl CreatingStreamingJobControl { } pub(super) fn pinned_upstream_log_epoch(&self) -> Option { - if matches!(&self.status, CreatingStreamingJobStatus::Finishing(_)) { - return None; + if self.status.is_finishing() { + None + } else { + // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed + Some(max( + self.barrier_control.max_collected_epoch().unwrap_or(0), + self.backfill_epoch, + )) } - // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed - Some(max( - self.barrier_control.max_collected_epoch().unwrap_or(0), - self.backfill_epoch, - )) } fn inject_barrier( @@ -212,6 +209,13 @@ impl CreatingStreamingJobControl { } else { false }; + if start_consume_upstream { + info!( + table_id = self.info.table_fragments.table_id().table_id, + prev_epoch = command_ctx.prev_epoch.value().0, + "start consuming upstream" + ); + } let progress_epoch = if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() { max(max_collected_epoch, self.backfill_epoch) @@ -225,71 +229,23 @@ impl CreatingStreamingJobControl { .0 .saturating_sub(progress_epoch) as _, ); - match &mut self.status { - CreatingStreamingJobStatus::ConsumingSnapshot { - pending_commands, - prev_epoch_fake_physical_time, - pending_non_checkpoint_barriers, - initial_barrier_info, - ref graph_info, - .. - } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job that are consuming snapshot" - ); - let new_barrier = CreatingStreamingJobStatus::new_fake_barrier( - prev_epoch_fake_physical_time, - pending_non_checkpoint_barriers, - initial_barrier_info, - command_ctx.kind.is_checkpoint(), - ); - pending_commands.push(command_ctx.clone()); - Self::inject_barrier( - self.info.table_fragments.table_id(), - control_stream_manager, - &mut self.barrier_control, - graph_info, - Some(graph_info), - new_barrier, - )?; - } - CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => { - Self::inject_barrier( - self.info.table_fragments.table_id(), - control_stream_manager, - &mut self.barrier_control, - graph_info, - if start_consume_upstream { - None - } else { - Some(graph_info) - }, - CreatingJobInjectBarrierInfo { - curr_epoch: command_ctx.curr_epoch.clone(), - prev_epoch: command_ctx.prev_epoch.clone(), - kind: command_ctx.kind.clone(), - new_actors: None, - mutation: None, - }, - )?; - let prev_epoch = command_ctx.prev_epoch.value().0; + if let Some(barrier_to_inject) = self + .status + .on_new_upstream_epoch(command_ctx, start_consume_upstream) + { + Self::inject_barrier( + self.info.table_fragments.table_id(), + control_stream_manager, + &mut self.barrier_control, + &self.graph_info, if start_consume_upstream { - info!( - table_id = self.info.table_fragments.table_id().table_id, - prev_epoch, "start consuming upstream" - ); - assert!(command_ctx.kind.is_checkpoint()); - self.status = CreatingStreamingJobStatus::Finishing(prev_epoch); - } - } - CreatingStreamingJobStatus::Finishing { .. } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job again" - ); - } - }; + None + } else { + Some(&self.graph_info) + }, + barrier_to_inject, + )?; + } Ok(()) } @@ -302,15 +258,15 @@ impl CreatingStreamingJobControl { ) -> MetaResult<()> { let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); self.barrier_control.collect(epoch, worker_id, resp); - if let Some((prev_barriers_to_inject, graph_info)) = prev_barriers_to_inject { + if let Some(prev_barriers_to_inject) = prev_barriers_to_inject { let table_id = self.info.table_fragments.table_id(); for info in prev_barriers_to_inject { Self::inject_barrier( table_id, control_stream_manager, &mut self.barrier_control, - graph_info, - Some(graph_info), + &self.graph_info, + Some(&self.graph_info), info, )?; } @@ -320,12 +276,11 @@ impl CreatingStreamingJobControl { pub(super) fn should_merge_to_upstream(&self) -> Option { if let CreatingStreamingJobStatus::ConsumingLogStore { - graph_info, ref log_store_progress_tracker, } = &self.status && log_store_progress_tracker.is_finished() { - Some(graph_info.clone()) + Some(self.graph_info.clone()) } else { None } @@ -392,7 +347,6 @@ impl CreatingStreamingJobControl { } pub(super) fn is_finished(&self) -> bool { - self.barrier_control.is_empty() - && matches!(&self.status, CreatingStreamingJobStatus::Finishing { .. }) + self.barrier_control.is_empty() && self.status.is_finishing() } } diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index 5a4967a9192d9..093747249f1df 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -18,7 +18,6 @@ use std::mem::take; use std::sync::Arc; use risingwave_common::hash::ActorId; -use risingwave_common::must_match; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::WorkerId; use risingwave_pb::hummock::HummockVersionStats; @@ -30,7 +29,6 @@ use risingwave_pb::stream_service::barrier_complete_response::{ use tracing::warn; use crate::barrier::command::CommandContext; -use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierKind, TracedEpoch}; @@ -100,12 +98,14 @@ impl CreateMviewLogStoreProgressTracker { #[derive(Debug)] pub(super) enum CreatingStreamingJobStatus { + /// The creating job is consuming upstream snapshot. + /// Will transit to `ConsumingLogStore` on `update_progress` when + /// the snapshot has been fully consumed after `update_progress`. ConsumingSnapshot { prev_epoch_fake_physical_time: u64, - pending_commands: Vec>, + pending_upstream_barriers: Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, - graph_info: InflightGraphInfo, snapshot_backfill_actors: HashSet, backfill_epoch: u64, /// The `prev_epoch` of pending non checkpoint barriers @@ -114,8 +114,10 @@ pub(super) enum CreatingStreamingJobStatus { /// Take the mutation out when injecting the first barrier initial_barrier_info: Option<(HashMap>, Mutation)>, }, + /// The creating job is consuming log store. + /// + /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`. ConsumingLogStore { - graph_info: InflightGraphInfo, log_store_progress_tracker: CreateMviewLogStoreProgressTracker, }, /// All backfill actors have started consuming upstream, and the job @@ -133,29 +135,16 @@ pub(super) struct CreatingJobInjectBarrierInfo { } impl CreatingStreamingJobStatus { - pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> { - match self { - CreatingStreamingJobStatus::ConsumingSnapshot { graph_info, .. } - | CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => Some(graph_info), - CreatingStreamingJobStatus::Finishing(_) => { - // when entering `Finishing`, the graph will have been added to the upstream graph, - // and therefore the separate graph info is inactive. - None - } - } - } - pub(super) fn update_progress( &mut self, create_mview_progress: impl IntoIterator, - ) -> Option<(Vec, &InflightGraphInfo)> { + ) -> Option> { match self { Self::ConsumingSnapshot { create_mview_tracker, ref version_stats, prev_epoch_fake_physical_time, - pending_commands, - ref graph_info, + pending_upstream_barriers, pending_non_checkpoint_barriers, ref backfill_epoch, initial_barrier_info, @@ -184,27 +173,24 @@ impl CreatingStreamingJobStatus { mutation, }] .into_iter() - .chain(pending_commands.drain(..).map(|command_ctx| { - CreatingJobInjectBarrierInfo { - curr_epoch: command_ctx.curr_epoch.clone(), - prev_epoch: command_ctx.prev_epoch.clone(), - kind: command_ctx.kind.clone(), + .chain(pending_upstream_barriers.drain(..).map( + |(prev_epoch, curr_epoch, kind)| CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, new_actors: None, mutation: None, - } - })) + }, + )) .collect(); *self = CreatingStreamingJobStatus::ConsumingLogStore { - graph_info: graph_info.clone(), log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new( snapshot_backfill_actors.iter().cloned(), barriers_to_inject.len(), ), }; - let graph_info = must_match!(self, - CreatingStreamingJobStatus::ConsumingLogStore {graph_info, ..} => graph_info); - Some((barriers_to_inject, graph_info)) + Some(barriers_to_inject) } else { None } @@ -220,6 +206,59 @@ impl CreatingStreamingJobStatus { } } + pub(super) fn on_new_upstream_epoch( + &mut self, + command_ctx: &Arc, + start_consume_upstream: bool, + ) -> Option { + match self { + CreatingStreamingJobStatus::ConsumingSnapshot { + pending_upstream_barriers, + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + .. + } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job that are consuming snapshot" + ); + pending_upstream_barriers.push(( + command_ctx.prev_epoch.clone(), + command_ctx.curr_epoch.clone(), + command_ctx.kind.clone(), + )); + Some(CreatingStreamingJobStatus::new_fake_barrier( + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + command_ctx.kind.is_checkpoint(), + )) + } + CreatingStreamingJobStatus::ConsumingLogStore { .. } => { + let prev_epoch = command_ctx.prev_epoch.value().0; + if start_consume_upstream { + assert!(command_ctx.kind.is_checkpoint()); + *self = CreatingStreamingJobStatus::Finishing(prev_epoch); + } + Some(CreatingJobInjectBarrierInfo { + curr_epoch: command_ctx.curr_epoch.clone(), + prev_epoch: command_ctx.prev_epoch.clone(), + kind: command_ctx.kind.clone(), + new_actors: None, + mutation: None, + }) + } + CreatingStreamingJobStatus::Finishing { .. } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job again" + ); + None + } + } + } + pub(super) fn new_fake_barrier( prev_epoch_fake_physical_time: &mut u64, pending_non_checkpoint_barriers: &mut Vec, @@ -255,4 +294,8 @@ impl CreatingStreamingJobStatus { } } } + + pub(super) fn is_finishing(&self) -> bool { + matches!(self, Self::Finishing(_)) + } }