diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..ec0e20f6013ce 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -63,6 +63,10 @@ message InputLevel { repeated SstableInfo table_infos = 3; } +message NewL0SubLevel { + repeated SstableInfo inserted_table_infos = 1; +} + message IntraLevelDelta { uint32 level_idx = 1; uint64 l0_sub_level_id = 2; @@ -112,6 +116,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMerge group_merge = 6; + NewL0SubLevel new_l0_sub_level = 7; } } @@ -528,7 +533,6 @@ message ReportCompactionTaskResponse { message ValidationTask { repeated SstableInfo sst_infos = 1; map sst_id_to_worker_id = 2; - uint64 epoch = 3; } // Delete SSTs in object store diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 45703554c2367..9fc0da18d7d73 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -28,6 +28,7 @@ message BarrierCompleteResponse { bool done = 2; uint64 consumed_epoch = 3; uint64 consumed_rows = 4; + uint32 pending_barrier_num = 5; } string request_id = 1; common.Status status = 2; diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b6ab7f111aaac..62e988f42f1cf 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive( } fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool { - let DeltaType::IntraLevel(delta) = delta else { - return false; - }; - delta - .inserted_table_infos - .iter() - .any(|sst| sst.sst_id == sst_id) - || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + match delta { + DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => { + false + } + DeltaType::IntraLevel(delta) => { + delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id) + || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + } + DeltaType::NewL0SubLevel(delta) => delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id), + } } fn print_delta(delta: &DeltaType) { diff --git a/src/meta/src/barrier/creating_job/barrier_control.rs b/src/meta/src/barrier/creating_job/barrier_control.rs index b0aca04645003..90ac3119f4c37 100644 --- a/src/meta/src/barrier/creating_job/barrier_control.rs +++ b/src/meta/src/barrier/creating_job/barrier_control.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::Bound::{Excluded, Unbounded}; use std::collections::{BTreeMap, HashSet, VecDeque}; use std::mem::take; +use std::ops::Bound::Unbounded; +use std::ops::{Bound, RangeBounds}; use std::time::Instant; use prometheus::HistogramTimer; @@ -26,22 +27,13 @@ use tracing::debug; use crate::rpc::metrics::MetaMetrics; -#[derive(Debug)] -pub(super) enum CreatingStreamingJobBarrierType { - Snapshot, - LogStore, - Upstream, -} - #[derive(Debug)] struct CreatingStreamingJobEpochState { epoch: u64, node_to_collect: HashSet, resps: Vec, - upstream_epoch_to_notify: Option, is_checkpoint: bool, enqueue_time: Instant, - barrier_type: CreatingStreamingJobBarrierType, } #[derive(Debug)] @@ -49,31 +41,30 @@ pub(super) struct CreatingStreamingJobBarrierControl { table_id: TableId, // key is prev_epoch of barrier inflight_barrier_queue: BTreeMap, + backfill_epoch: u64, initial_epoch: Option, max_collected_epoch: Option, - max_attached_epoch: Option, - // newer epoch at the front. should all be checkpoint barrier + // newer epoch at the front. pending_barriers_to_complete: VecDeque, completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>, // metrics consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>, consuming_log_store_barrier_latency: LabelGuardedHistogram<2>, - consuming_upstream_barrier_latency: LabelGuardedHistogram<2>, wait_commit_latency: LabelGuardedHistogram<1>, inflight_barrier_num: LabelGuardedIntGauge<1>, } impl CreatingStreamingJobBarrierControl { - pub(super) fn new(table_id: TableId, metrics: &MetaMetrics) -> Self { + pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self { let table_id_str = format!("{}", table_id.table_id); Self { table_id, inflight_barrier_queue: Default::default(), + backfill_epoch, initial_epoch: None, max_collected_epoch: None, - max_attached_epoch: None, pending_barriers_to_complete: Default::default(), completing_barrier: None, @@ -83,9 +74,6 @@ impl CreatingStreamingJobBarrierControl { consuming_log_store_barrier_latency: metrics .snapshot_backfill_barrier_latency .with_guarded_label_values(&[&table_id_str, "consuming_log_store"]), - consuming_upstream_barrier_latency: metrics - .snapshot_backfill_barrier_latency - .with_guarded_label_values(&[&table_id_str, "consuming_upstream"]), wait_commit_latency: metrics .snapshot_backfill_wait_commit_latency .with_guarded_label_values(&[&table_id_str]), @@ -127,7 +115,6 @@ impl CreatingStreamingJobBarrierControl { epoch: u64, node_to_collect: HashSet, is_checkpoint: bool, - barrier_type: CreatingStreamingJobBarrierType, ) { debug!( epoch, @@ -142,17 +129,12 @@ impl CreatingStreamingJobBarrierControl { if let Some(latest_epoch) = self.latest_epoch() { assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch); } - if let Some(max_attached_epoch) = self.max_attached_epoch { - assert!(epoch > max_attached_epoch); - } let epoch_state = CreatingStreamingJobEpochState { epoch, node_to_collect, resps: vec![], - upstream_epoch_to_notify: None, is_checkpoint, enqueue_time: Instant::now(), - barrier_type, }; if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() { self.add_collected(epoch_state); @@ -163,41 +145,6 @@ impl CreatingStreamingJobBarrierControl { .set(self.inflight_barrier_queue.len() as _); } - pub(super) fn unattached_epochs(&self) -> impl Iterator + '_ { - let range_start = if let Some(max_attached_epoch) = self.max_attached_epoch { - Excluded(max_attached_epoch) - } else { - Unbounded - }; - self.inflight_barrier_queue - .range((range_start, Unbounded)) - .map(|(epoch, state)| (*epoch, state.is_checkpoint)) - } - - /// Attach an `upstream_epoch` to the `epoch` of the creating job. - /// - /// The `upstream_epoch` won't be completed until the `epoch` of the creating job is completed so that - /// the `upstream_epoch` should wait for the progress of creating job, and we can ensure that the downstream - /// creating job can eventually catch up with the upstream. - pub(super) fn attach_upstream_epoch(&mut self, epoch: u64, upstream_epoch: u64) { - debug!( - epoch, - upstream_epoch, - table_id = ?self.table_id.table_id, - "attach epoch" - ); - if let Some(max_attached_epoch) = self.max_attached_epoch { - assert!(epoch > max_attached_epoch); - } - self.max_attached_epoch = Some(epoch); - let epoch_state = self - .inflight_barrier_queue - .get_mut(&epoch) - .expect("should exist"); - assert!(epoch_state.upstream_epoch_to_notify.is_none()); - epoch_state.upstream_epoch_to_notify = Some(upstream_epoch); - } - pub(super) fn collect( &mut self, epoch: u64, @@ -228,46 +175,47 @@ impl CreatingStreamingJobBarrierControl { .set(self.inflight_barrier_queue.len() as _); } - #[expect(clippy::type_complexity)] - /// Return (`upstream_epochs_to_notify`, Some((epoch, resps, `is_first_commit`))) + /// Return Some((epoch, resps, `is_first_commit`)) /// - /// `upstream_epochs_to_notify` is the upstream epochs of non-checkpoint barriers to be notified about barrier completing. - /// These non-checkpoint barriers does not need to call `commit_epoch` and therefore can be completed as long as collected. + /// Only epoch within the `epoch_end_bound` can be started. + /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that + /// the creating job won't have higher committed epoch than the upstream. pub(super) fn start_completing( &mut self, - ) -> (Vec, Option<(u64, Vec, bool)>) { - if self.completing_barrier.is_some() { - return (vec![], None); - } - let mut upstream_epochs_to_notify = vec![]; - while let Some(mut epoch_state) = self.pending_barriers_to_complete.pop_back() { + epoch_end_bound: Bound, + ) -> Option<(u64, Vec, bool)> { + assert!(self.completing_barrier.is_none()); + let epoch_range: (Bound, Bound) = (Unbounded, epoch_end_bound); + while let Some(epoch_state) = self.pending_barriers_to_complete.back() + && epoch_range.contains(&epoch_state.epoch) + { + let mut epoch_state = self + .pending_barriers_to_complete + .pop_back() + .expect("non-empty"); let epoch = epoch_state.epoch; let is_first = self.initial_epoch.expect("should have set") == epoch; if is_first { assert!(epoch_state.is_checkpoint); } else if !epoch_state.is_checkpoint { - if let Some(upstream_epoch) = epoch_state.upstream_epoch_to_notify { - upstream_epochs_to_notify.push(upstream_epoch); - } continue; } let resps = take(&mut epoch_state.resps); self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer())); - return (upstream_epochs_to_notify, Some((epoch, resps, is_first))); + return Some((epoch, resps, is_first)); } - (upstream_epochs_to_notify, None) + None } /// Ack on completing a checkpoint barrier. /// /// Return the upstream epoch to be notified when there is any. - pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option { + pub(super) fn ack_completed(&mut self, completed_epoch: u64) { let (epoch_state, wait_commit_timer) = self.completing_barrier.take().expect("should exist"); wait_commit_timer.observe_duration(); assert_eq!(epoch_state.epoch, completed_epoch); - epoch_state.upstream_epoch_to_notify } fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) { @@ -280,10 +228,10 @@ impl CreatingStreamingJobBarrierControl { } self.max_collected_epoch = Some(epoch_state.epoch); let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64(); - let barrier_latency_metrics = match &epoch_state.barrier_type { - CreatingStreamingJobBarrierType::Snapshot => &self.consuming_snapshot_barrier_latency, - CreatingStreamingJobBarrierType::LogStore => &self.consuming_log_store_barrier_latency, - CreatingStreamingJobBarrierType::Upstream => &self.consuming_upstream_barrier_latency, + let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch { + &self.consuming_snapshot_barrier_latency + } else { + &self.consuming_log_store_barrier_latency }; barrier_latency_metrics.observe(barrier_latency); self.pending_barriers_to_complete.push_front(epoch_state); diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 08b82148676f8..0598cd319c590 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -17,25 +17,21 @@ mod status; use std::cmp::max; use std::collections::HashMap; -use std::mem::take; +use std::ops::Bound::{Excluded, Unbounded}; use std::sync::Arc; -use std::time::Duration; -use prometheus::HistogramTimer; -use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; -use risingwave_common::util::epoch::Epoch; +use risingwave_common::catalog::TableId; +use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_meta_model_v2::WorkerId; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::BarrierCompleteResponse; -use tracing::{debug, info}; +use tracing::info; use crate::barrier::command::CommandContext; -use crate::barrier::creating_job::barrier_control::{ - CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType, -}; +use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierControl; use crate::barrier::creating_job::status::{ CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, }; @@ -52,11 +48,12 @@ pub(super) struct CreatingStreamingJobControl { pub(super) snapshot_backfill_info: SnapshotBackfillInfo, backfill_epoch: u64, + graph_info: InflightGraphInfo, + barrier_control: CreatingStreamingJobBarrierControl, status: CreatingStreamingJobStatus, upstream_lag: LabelGuardedIntGauge<1>, - upstream_wait_progress_latency: LabelGuardedHistogram<1>, } impl CreatingStreamingJobControl { @@ -73,6 +70,7 @@ impl CreatingStreamingJobControl { definition = info.definition, "new creating job" ); + let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids(); let mut create_mview_tracker = CreateMviewProgressTracker::default(); create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat); let fragment_info: HashMap<_, _> = info.new_fragment_info().collect(); @@ -85,14 +83,19 @@ impl CreatingStreamingJobControl { Self { info, snapshot_backfill_info, - barrier_control: CreatingStreamingJobBarrierControl::new(table_id, metrics), + barrier_control: CreatingStreamingJobBarrierControl::new( + table_id, + backfill_epoch, + metrics, + ), backfill_epoch, + graph_info: InflightGraphInfo::new(fragment_info), status: CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time: 0, - pending_commands: vec![], + pending_upstream_barriers: vec![], version_stats: version_stat.clone(), create_mview_tracker, - graph_info: InflightGraphInfo::new(fragment_info), + snapshot_backfill_actors, backfill_epoch, pending_non_checkpoint_barriers: vec![], initial_barrier_info: Some((actors_to_create, initial_mutation)), @@ -100,29 +103,16 @@ impl CreatingStreamingJobControl { upstream_lag: metrics .snapshot_backfill_lag .with_guarded_label_values(&[&table_id_str]), - upstream_wait_progress_latency: metrics - .snapshot_backfill_upstream_wait_progress_latency - .with_guarded_label_values(&[&table_id_str]), } } - pub(super) fn start_wait_progress_timer(&self) -> HistogramTimer { - self.upstream_wait_progress_latency.start_timer() - } - pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool { self.barrier_control.is_wait_on_worker(worker_id) - || self - .status - .active_graph_info() - .map(|info| info.contains_worker(worker_id)) - .unwrap_or(false) + || (self.status.is_finishing() && self.graph_info.contains_worker(worker_id)) } pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap) { - if let Some(info) = self.status.active_graph_info() { - info.on_new_worker_node_map(node_map) - } + self.graph_info.on_new_worker_node_map(node_map) } pub(super) fn gen_ddl_progress(&self) -> DdlProgress { @@ -142,32 +132,15 @@ impl CreatingStreamingJobControl { } } CreatingStreamingJobStatus::ConsumingLogStore { - start_consume_log_store_epoch, + log_store_progress_tracker, .. } => { - let max_collected_epoch = max( - self.barrier_control.max_collected_epoch().unwrap_or(0), - self.backfill_epoch, - ); - let lag = Duration::from_millis( - Epoch(*start_consume_log_store_epoch) - .physical_time() - .saturating_sub(Epoch(max_collected_epoch).physical_time()), - ); format!( - "LogStore [remain lag: {:?}, epoch cnt: {}]", - lag, - self.barrier_control.inflight_barrier_count() + "LogStore [{}]", + log_store_progress_tracker.gen_ddl_progress() ) } - CreatingStreamingJobStatus::ConsumingUpstream { .. } => { - format!( - "Upstream [unattached: {}, epoch cnt: {}]", - self.barrier_control.unattached_epochs().count(), - self.barrier_control.inflight_barrier_count(), - ) - } - CreatingStreamingJobStatus::Finishing { .. } => { + CreatingStreamingJobStatus::Finishing(_) => { format!( "Finishing [epoch count: {}]", self.barrier_control.inflight_barrier_count() @@ -182,84 +155,43 @@ impl CreatingStreamingJobControl { } pub(super) fn pinned_upstream_log_epoch(&self) -> Option { - let stop_consume_log_store_epoch = match &self.status { - CreatingStreamingJobStatus::ConsumingSnapshot { .. } - | CreatingStreamingJobStatus::ConsumingLogStore { .. } => None, - CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch, - .. - } - | CreatingStreamingJobStatus::Finishing { - start_consume_upstream_epoch, - .. - } => Some(*start_consume_upstream_epoch), - }; - if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() { - if max_collected_epoch < self.backfill_epoch { - Some(self.backfill_epoch) - } else if let Some(stop_consume_log_store_epoch) = stop_consume_log_store_epoch - && max_collected_epoch >= stop_consume_log_store_epoch - { - None - } else { - Some(max_collected_epoch) - } + if self.status.is_finishing() { + None } else { - Some(self.backfill_epoch) + // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed + Some(max( + self.barrier_control.max_collected_epoch().unwrap_or(0), + self.backfill_epoch, + )) } } - pub(super) fn may_inject_fake_barrier( - &mut self, + fn inject_barrier( + table_id: TableId, control_stream_manager: &mut ControlStreamManager, - upstream_prev_epoch: u64, - is_checkpoint: bool, + barrier_control: &mut CreatingStreamingJobBarrierControl, + pre_applied_graph_info: &InflightGraphInfo, + applied_graph_info: Option<&InflightGraphInfo>, + CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + mutation, + }: CreatingJobInjectBarrierInfo, ) -> MetaResult<()> { - if let Some((barriers_to_inject, graph_info)) = - self.status.may_inject_fake_barrier(is_checkpoint) - { - if let Some(graph_info) = graph_info { - info!( - table_id = self.info.table_fragments.table_id().table_id, - upstream_prev_epoch, "start consuming log store" - ); - self.status = CreatingStreamingJobStatus::ConsumingLogStore { - graph_info, - start_consume_log_store_epoch: upstream_prev_epoch, - }; - } - let graph_info = self - .status - .active_graph_info() - .expect("must exist when having barriers to inject"); - let table_id = self.info.table_fragments.table_id(); - for CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, - new_actors, - mutation, - } in barriers_to_inject - { - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - mutation, - (&curr_epoch, &prev_epoch), - &kind, - graph_info, - Some(graph_info), - new_actors, - vec![], - vec![], - )?; - self.barrier_control.enqueue_epoch( - prev_epoch.value().0, - node_to_collect, - kind.is_checkpoint(), - CreatingStreamingJobBarrierType::Snapshot, - ); - } - } + let node_to_collect = control_stream_manager.inject_barrier( + Some(table_id), + mutation, + (&curr_epoch, &prev_epoch), + &kind, + pre_applied_graph_info, + applied_graph_info, + new_actors, + vec![], + vec![], + )?; + barrier_control.enqueue_epoch(prev_epoch.value().0, node_to_collect, kind.is_checkpoint()); Ok(()) } @@ -267,7 +199,7 @@ impl CreatingStreamingJobControl { &mut self, control_stream_manager: &mut ControlStreamManager, command_ctx: &Arc, - ) -> MetaResult>> { + ) -> MetaResult<()> { let table_id = self.info.table_fragments.table_id(); let start_consume_upstream = if let Command::MergeSnapshotBackfillStreamingJobs( jobs_to_merge, @@ -277,6 +209,13 @@ impl CreatingStreamingJobControl { } else { false }; + if start_consume_upstream { + info!( + table_id = self.info.table_fragments.table_id().table_id, + prev_epoch = command_ctx.prev_epoch.value().0, + "start consuming upstream" + ); + } let progress_epoch = if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() { max(max_collected_epoch, self.backfill_epoch) @@ -290,125 +229,24 @@ impl CreatingStreamingJobControl { .0 .saturating_sub(progress_epoch) as _, ); - let graph_to_finish = match &mut self.status { - CreatingStreamingJobStatus::ConsumingSnapshot { - pending_commands, .. - } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job that are consuming snapshot" - ); - pending_commands.push(command_ctx.clone()); - None - } - CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => { - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - if start_consume_upstream { - // erase the mutation on upstream except the last command - command_ctx.to_mutation() - } else { - None - }, - (&command_ctx.curr_epoch, &command_ctx.prev_epoch), - &command_ctx.kind, - graph_info, - Some(graph_info), - None, - vec![], - vec![], - )?; - self.barrier_control.enqueue_epoch( - command_ctx.prev_epoch.value().0, - node_to_collect, - command_ctx.kind.is_checkpoint(), - CreatingStreamingJobBarrierType::LogStore, - ); - let prev_epoch = command_ctx.prev_epoch.value().0; + if let Some(barrier_to_inject) = self + .status + .on_new_upstream_epoch(command_ctx, start_consume_upstream) + { + Self::inject_barrier( + self.info.table_fragments.table_id(), + control_stream_manager, + &mut self.barrier_control, + &self.graph_info, if start_consume_upstream { - let graph_info = take(graph_info); - info!( - table_id = self.info.table_fragments.table_id().table_id, - prev_epoch, "start consuming upstream" - ); - self.status = CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch: prev_epoch, - graph_info, - }; - } - None - } - CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch, - graph_info, - } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job again" - ); - - let should_finish = command_ctx.kind.is_checkpoint() - && self.barrier_control.unattached_epochs().next().is_none(); - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - // do not send the upstream barrier mutation because in `ConsumingUpstream` stage, - // barriers are still injected and collected independently on the creating jobs. - None, - (&command_ctx.curr_epoch, &command_ctx.prev_epoch), - &command_ctx.kind, - graph_info, - if should_finish { - None - } else { - Some(graph_info) - }, - None, - vec![], - vec![], - )?; - let prev_epoch = command_ctx.prev_epoch.value().0; - self.barrier_control.enqueue_epoch( - prev_epoch, - node_to_collect, - command_ctx.kind.is_checkpoint(), - CreatingStreamingJobBarrierType::Upstream, - ); - let graph_info = if should_finish { - info!(prev_epoch, table_id = ?self.info.table_fragments.table_id(), "mark as finishing"); - self.barrier_control - .attach_upstream_epoch(prev_epoch, prev_epoch); - let graph_info = take(graph_info); - self.status = CreatingStreamingJobStatus::Finishing { - start_consume_upstream_epoch: *start_consume_upstream_epoch, - }; - Some(Some(graph_info)) + None } else { - let mut unattached_epochs_iter = self.barrier_control.unattached_epochs(); - let mut epoch_to_attach = unattached_epochs_iter.next().expect("non-empty").0; - let mut remain_count = 5; - while remain_count > 0 - && let Some((epoch, _)) = unattached_epochs_iter.next() - { - remain_count -= 1; - epoch_to_attach = epoch; - } - drop(unattached_epochs_iter); - self.barrier_control - .attach_upstream_epoch(epoch_to_attach, prev_epoch); - Some(None) - }; - - graph_info - } - CreatingStreamingJobStatus::Finishing { .. } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job again" - ); - None - } - }; - Ok(graph_to_finish) + Some(&self.graph_info) + }, + barrier_to_inject, + )?; + } + Ok(()) } pub(super) fn collect( @@ -416,57 +254,99 @@ impl CreatingStreamingJobControl { epoch: u64, worker_id: WorkerId, resp: BarrierCompleteResponse, - ) { - self.status.update_progress(&resp.create_mview_progress); + control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult<()> { + let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); self.barrier_control.collect(epoch, worker_id, resp); + if let Some(prev_barriers_to_inject) = prev_barriers_to_inject { + let table_id = self.info.table_fragments.table_id(); + for info in prev_barriers_to_inject { + Self::inject_barrier( + table_id, + control_stream_manager, + &mut self.barrier_control, + &self.graph_info, + Some(&self.graph_info), + info, + )?; + } + } + Ok(()) } pub(super) fn should_merge_to_upstream(&self) -> Option { - if let ( - CreatingStreamingJobStatus::ConsumingLogStore { - graph_info, - start_consume_log_store_epoch, - }, - Some(max_collected_epoch), - ) = (&self.status, self.barrier_control.max_collected_epoch()) + if let CreatingStreamingJobStatus::ConsumingLogStore { + ref log_store_progress_tracker, + } = &self.status + && log_store_progress_tracker.is_finished() { - if max_collected_epoch >= *start_consume_log_store_epoch { - Some(graph_info.clone()) - } else { - let lag = Duration::from_millis( - Epoch(*start_consume_log_store_epoch).physical_time() - - Epoch(max_collected_epoch).physical_time(), - ); - debug!( - ?lag, - max_collected_epoch, start_consume_log_store_epoch, "wait consuming log store" - ); - None - } + Some(self.graph_info.clone()) } else { None } } +} - #[expect(clippy::type_complexity)] +pub(super) enum CompleteJobType { + /// The first barrier + First, + Normal, + /// The last barrier to complete + Finished, +} + +impl CreatingStreamingJobControl { pub(super) fn start_completing( &mut self, - ) -> (Vec, Option<(u64, Vec, bool)>) { - self.barrier_control.start_completing() + min_upstream_inflight_epoch: Option, + ) -> Option<(u64, Vec, CompleteJobType)> { + let (finished_at_epoch, epoch_end_bound) = match &self.status { + CreatingStreamingJobStatus::Finishing(finish_at_epoch) => { + let epoch_end_bound = min_upstream_inflight_epoch + .map(|upstream_epoch| { + if upstream_epoch < *finish_at_epoch { + Excluded(upstream_epoch) + } else { + Unbounded + } + }) + .unwrap_or(Unbounded); + (Some(*finish_at_epoch), epoch_end_bound) + } + CreatingStreamingJobStatus::ConsumingSnapshot { .. } + | CreatingStreamingJobStatus::ConsumingLogStore { .. } => ( + None, + min_upstream_inflight_epoch + .map(Excluded) + .unwrap_or(Unbounded), + ), + }; + self.barrier_control.start_completing(epoch_end_bound).map( + |(epoch, resps, is_first_commit)| { + let status = if let Some(finish_at_epoch) = finished_at_epoch { + assert!(!is_first_commit); + if epoch == finish_at_epoch { + self.barrier_control.ack_completed(epoch); + assert!(self.barrier_control.is_empty()); + CompleteJobType::Finished + } else { + CompleteJobType::Normal + } + } else if is_first_commit { + CompleteJobType::First + } else { + CompleteJobType::Normal + }; + (epoch, resps, status) + }, + ) } - pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option<(u64, bool)> { - let upstream_epoch_to_notify = self.barrier_control.ack_completed(completed_epoch); - if let Some(upstream_epoch_to_notify) = upstream_epoch_to_notify { - Some((upstream_epoch_to_notify, self.is_finished())) - } else { - assert!(!self.is_finished()); - None - } + pub(super) fn ack_completed(&mut self, completed_epoch: u64) { + self.barrier_control.ack_completed(completed_epoch); } pub(super) fn is_finished(&self) -> bool { - self.barrier_control.is_empty() - && matches!(&self.status, CreatingStreamingJobStatus::Finishing { .. }) + self.barrier_control.is_empty() && self.status.is_finishing() } } diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index aaae86d0a2144..093747249f1df 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -12,30 +12,101 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::mem::take; use std::sync::Arc; +use risingwave_common::hash::ActorId; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::StreamActor; -use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::barrier_complete_response::{ + CreateMviewProgress, PbCreateMviewProgress, +}; +use tracing::warn; use crate::barrier::command::CommandContext; -use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierKind, TracedEpoch}; +#[derive(Debug)] +pub(super) struct CreateMviewLogStoreProgressTracker { + /// `actor_id` -> `pending_barrier_count` + ongoing_actors: HashMap, + finished_actors: HashSet, +} + +impl CreateMviewLogStoreProgressTracker { + fn new(actors: impl Iterator, initial_pending_count: usize) -> Self { + Self { + ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, initial_pending_count))), + finished_actors: HashSet::new(), + } + } + + pub(super) fn gen_ddl_progress(&self) -> String { + let sum = self.ongoing_actors.values().sum::() as f64; + let count = if self.ongoing_actors.is_empty() { + 1 + } else { + self.ongoing_actors.len() + } as f64; + let avg = sum / count; + format!( + "finished: {}/{}, avg epoch count {}", + self.finished_actors.len(), + self.ongoing_actors.len() + self.finished_actors.len(), + avg + ) + } + + fn update(&mut self, progress: impl IntoIterator) { + for progress in progress { + match self.ongoing_actors.entry(progress.backfill_actor_id) { + Entry::Occupied(mut entry) => { + if progress.done { + entry.remove_entry(); + assert!( + self.finished_actors.insert(progress.backfill_actor_id), + "non-duplicate" + ); + } else { + *entry.get_mut() = progress.pending_barrier_num as _; + } + } + Entry::Vacant(_) => { + if cfg!(debug_assertions) { + panic!( + "reporting progress on non-inflight actor: {:?} {:?}", + progress, self + ); + } else { + warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor"); + } + } + } + } + } + + pub(super) fn is_finished(&self) -> bool { + self.ongoing_actors.is_empty() + } +} + #[derive(Debug)] pub(super) enum CreatingStreamingJobStatus { + /// The creating job is consuming upstream snapshot. + /// Will transit to `ConsumingLogStore` on `update_progress` when + /// the snapshot has been fully consumed after `update_progress`. ConsumingSnapshot { prev_epoch_fake_physical_time: u64, - pending_commands: Vec>, + pending_upstream_barriers: Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, - graph_info: InflightGraphInfo, + snapshot_backfill_actors: HashSet, backfill_epoch: u64, /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, @@ -43,17 +114,16 @@ pub(super) enum CreatingStreamingJobStatus { /// Take the mutation out when injecting the first barrier initial_barrier_info: Option<(HashMap>, Mutation)>, }, + /// The creating job is consuming log store. + /// + /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`. ConsumingLogStore { - graph_info: InflightGraphInfo, - start_consume_log_store_epoch: u64, - }, - ConsumingUpstream { - start_consume_upstream_epoch: u64, - graph_info: InflightGraphInfo, - }, - Finishing { - start_consume_upstream_epoch: u64, + log_store_progress_tracker: CreateMviewLogStoreProgressTracker, }, + /// All backfill actors have started consuming upstream, and the job + /// will be finished when all previously injected barriers have been collected + /// Store the `prev_epoch` that will finish at. + Finishing(u64), } pub(super) struct CreatingJobInjectBarrierInfo { @@ -65,79 +135,138 @@ pub(super) struct CreatingJobInjectBarrierInfo { } impl CreatingStreamingJobStatus { - pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> { - match self { - CreatingStreamingJobStatus::ConsumingSnapshot { graph_info, .. } - | CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } - | CreatingStreamingJobStatus::ConsumingUpstream { graph_info, .. } => Some(graph_info), - CreatingStreamingJobStatus::Finishing { .. } => { - // when entering `Finishing`, the graph will have been added to the upstream graph, - // and therefore the separate graph info is inactive. - None - } - } - } - pub(super) fn update_progress( &mut self, create_mview_progress: impl IntoIterator, - ) { - if let Self::ConsumingSnapshot { - create_mview_tracker, - ref version_stats, - .. - } = self - { - create_mview_tracker.update_tracking_jobs(None, create_mview_progress, version_stats); - } - } - - /// return - /// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject - /// - Some(`graph_info`) when the status should transit to `ConsumingLogStore` - pub(super) fn may_inject_fake_barrier( - &mut self, - is_checkpoint: bool, - ) -> Option<(Vec, Option)> { - if let CreatingStreamingJobStatus::ConsumingSnapshot { - prev_epoch_fake_physical_time, - pending_commands, - create_mview_tracker, - graph_info, - pending_non_checkpoint_barriers, - ref backfill_epoch, - initial_barrier_info, - .. - } = self - { - if create_mview_tracker.has_pending_finished_jobs() { - assert!(initial_barrier_info.is_none()); - pending_non_checkpoint_barriers.push(*backfill_epoch); + ) -> Option> { + match self { + Self::ConsumingSnapshot { + create_mview_tracker, + ref version_stats, + prev_epoch_fake_physical_time, + pending_upstream_barriers, + pending_non_checkpoint_barriers, + ref backfill_epoch, + initial_barrier_info, + ref snapshot_backfill_actors, + .. + } => { + create_mview_tracker.update_tracking_jobs( + None, + create_mview_progress, + version_stats, + ); + if create_mview_tracker.has_pending_finished_jobs() { + let (new_actors, mutation) = match initial_barrier_info.take() { + Some((new_actors, mutation)) => (Some(new_actors), Some(mutation)), + None => (None, None), + }; + assert!(initial_barrier_info.is_none()); + pending_non_checkpoint_barriers.push(*backfill_epoch); - let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); - let barriers_to_inject = - [CreatingJobInjectBarrierInfo { + let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); + let barriers_to_inject: Vec<_> = [CreatingJobInjectBarrierInfo { curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), prev_epoch: TracedEpoch::new(prev_epoch), kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), - new_actors: None, - mutation: None, + new_actors, + mutation, }] .into_iter() - .chain(pending_commands.drain(..).map(|command_ctx| { - CreatingJobInjectBarrierInfo { - curr_epoch: command_ctx.curr_epoch.clone(), - prev_epoch: command_ctx.prev_epoch.clone(), - kind: command_ctx.kind.clone(), + .chain(pending_upstream_barriers.drain(..).map( + |(prev_epoch, curr_epoch, kind)| CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, new_actors: None, mutation: None, - } - })) + }, + )) .collect(); - let graph_info = take(graph_info); - Some((barriers_to_inject, Some(graph_info))) - } else { + *self = CreatingStreamingJobStatus::ConsumingLogStore { + log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new( + snapshot_backfill_actors.iter().cloned(), + barriers_to_inject.len(), + ), + }; + Some(barriers_to_inject) + } else { + None + } + } + CreatingStreamingJobStatus::ConsumingLogStore { + log_store_progress_tracker, + .. + } => { + log_store_progress_tracker.update(create_mview_progress); + None + } + CreatingStreamingJobStatus::Finishing(_) => None, + } + } + + pub(super) fn on_new_upstream_epoch( + &mut self, + command_ctx: &Arc, + start_consume_upstream: bool, + ) -> Option { + match self { + CreatingStreamingJobStatus::ConsumingSnapshot { + pending_upstream_barriers, + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + .. + } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job that are consuming snapshot" + ); + pending_upstream_barriers.push(( + command_ctx.prev_epoch.clone(), + command_ctx.curr_epoch.clone(), + command_ctx.kind.clone(), + )); + Some(CreatingStreamingJobStatus::new_fake_barrier( + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + command_ctx.kind.is_checkpoint(), + )) + } + CreatingStreamingJobStatus::ConsumingLogStore { .. } => { + let prev_epoch = command_ctx.prev_epoch.value().0; + if start_consume_upstream { + assert!(command_ctx.kind.is_checkpoint()); + *self = CreatingStreamingJobStatus::Finishing(prev_epoch); + } + Some(CreatingJobInjectBarrierInfo { + curr_epoch: command_ctx.curr_epoch.clone(), + prev_epoch: command_ctx.prev_epoch.clone(), + kind: command_ctx.kind.clone(), + new_actors: None, + mutation: None, + }) + } + CreatingStreamingJobStatus::Finishing { .. } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job again" + ); + None + } + } + } + + pub(super) fn new_fake_barrier( + prev_epoch_fake_physical_time: &mut u64, + pending_non_checkpoint_barriers: &mut Vec, + initial_barrier_info: &mut Option<(HashMap>, Mutation)>, + is_checkpoint: bool, + ) -> CreatingJobInjectBarrierInfo { + { + { let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time)); *prev_epoch_fake_physical_time += 1; @@ -155,19 +284,18 @@ impl CreatingStreamingJobStatus { } else { Default::default() }; - Some(( - vec![CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, - new_actors, - mutation, - }], - None, - )) + CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + mutation, + } } - } else { - None } } + + pub(super) fn is_finishing(&self) -> bool { + matches!(self, Self::Finishing(_)) + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 120c6d8518bbb..d349e7bbfe0a4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::pending; +use std::future::{pending, Future}; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -51,10 +50,11 @@ use tracing::{debug, error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; -use crate::barrier::creating_job::CreatingStreamingJobControl; +use crate::barrier::creating_job::{CompleteJobType, CreatingStreamingJobControl}; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; +use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -218,7 +218,7 @@ struct CheckpointControl { /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. - completing_command: CompletingCommand, + completing_task: CompletingTask, hummock_version_stats: HummockVersionStats, @@ -235,7 +235,7 @@ impl CheckpointControl { Self { command_ctx_queue: Default::default(), creating_streaming_job_controls: Default::default(), - completing_command: CompletingCommand::None, + completing_task: CompletingTask::None, hummock_version_stats: context.hummock_manager.get_version_stats().await, create_mview_tracker, context, @@ -244,8 +244,11 @@ impl CheckpointControl { fn total_command_num(&self) -> usize { self.command_ctx_queue.len() - + match &self.completing_command { - CompletingCommand::GlobalStreamingGraph { .. } => 1, + + match &self.completing_task { + CompletingTask::Completing { + command_ctx: Some(_), + .. + } => 1, _ => 0, } } @@ -291,8 +294,7 @@ impl CheckpointControl { command_ctx: Arc, notifiers: Vec, node_to_collect: HashSet, - jobs_to_wait: HashSet, - table_ids_to_commit: HashSet, + creating_jobs_to_wait: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -305,27 +307,9 @@ impl CheckpointControl { tracing::trace!( prev_epoch = command_ctx.prev_epoch.value().0, - ?jobs_to_wait, + ?creating_jobs_to_wait, "enqueue command" ); - let creating_jobs_to_wait = jobs_to_wait - .into_iter() - .map(|table_id| { - ( - table_id, - if node_to_collect.is_empty() { - Some( - self.creating_streaming_job_controls - .get(&table_id) - .expect("should exist") - .start_wait_progress_timer(), - ) - } else { - None - }, - ) - }) - .collect(); self.command_ctx_queue.insert( command_ctx.prev_epoch.value().0, EpochNode { @@ -334,8 +318,7 @@ impl CheckpointControl { node_to_collect, resps: vec![], creating_jobs_to_wait, - finished_table_ids: HashMap::new(), - table_ids_to_commit, + finished_jobs: HashMap::new(), }, command_ctx, notifiers, @@ -345,7 +328,11 @@ impl CheckpointControl { /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_collected(&mut self, resp: BarrierCompleteResponse) { + fn barrier_collected( + &mut self, + resp: BarrierCompleteResponse, + control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult<()> { let worker_id = resp.worker_id; let prev_epoch = resp.epoch; tracing::trace!( @@ -357,19 +344,6 @@ impl CheckpointControl { if resp.partial_graph_id == u32::MAX { if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { assert!(node.state.node_to_collect.remove(&(worker_id as _))); - if node.state.node_to_collect.is_empty() { - node.state - .creating_jobs_to_wait - .iter_mut() - .for_each(|(table_id, timer)| { - *timer = Some( - self.creating_streaming_job_controls - .get(table_id) - .expect("should exist") - .start_wait_progress_timer(), - ); - }); - } node.state.resps.push(resp); } else { panic!( @@ -382,8 +356,9 @@ impl CheckpointControl { self.creating_streaming_job_controls .get_mut(&creating_table_id) .expect("should exist") - .collect(prev_epoch, worker_id as _, resp); + .collect(prev_epoch, worker_id as _, resp, control_stream_manager)?; } + Ok(()) } /// Pause inject barrier until True. @@ -400,11 +375,9 @@ impl CheckpointControl { .command_ctx_queue .last_key_value() .map(|(_, x)| &x.command_ctx) - .or(match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => Some(command_ctx), + .or(match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -413,12 +386,9 @@ impl CheckpointControl { .values() .map(|node| &node.command_ctx) .chain( - match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - Some(command_ctx), + match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), } .into_iter() ) @@ -432,32 +402,10 @@ impl CheckpointControl { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. - let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { - CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - .. - } => { - info!( - prev_epoch = ?command_ctx.prev_epoch, - curr_epoch = ?command_ctx.curr_epoch, - "waiting for completing command to finish in recovery" - ); - match join_handle.await { - Err(e) => { - warn!(err = ?e.as_report(), "failed to join completing task"); - true - } - Ok(Err(e)) => { - warn!(err = ?e.as_report(), "failed to complete barrier during clear"); - true - } - Ok(Ok(_)) => false, - } - } - CompletingCommand::Err(_) => true, - CompletingCommand::CreatingStreamingJob { join_handle, .. } => { + let is_err = match replace(&mut self.completing_task, CompletingTask::None) { + CompletingTask::None => false, + CompletingTask::Completing { join_handle, .. } => { + info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); @@ -470,38 +418,19 @@ impl CheckpointControl { Ok(Ok(_)) => false, } } + CompletingTask::Err(_) => true, }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() - { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); - let (prev_epoch, curr_epoch) = ( - node.command_ctx.prev_epoch.value().0, - node.command_ctx.curr_epoch.value().0, - ); - let finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, &self.hummock_version_stats); - if let Err(e) = self - .context - .clone() - .complete_barrier(node, finished_jobs, HashMap::new()) - .await - { + while let Some(task) = self.next_complete_barrier_task(None) { + if let Err(e) = self.context.clone().complete_barrier(task).await { error!( - prev_epoch, - curr_epoch, err = ?e.as_report(), "failed to complete barrier during recovery" ); break; } else { - info!( - prev_epoch, - curr_epoch, "succeed to complete barrier during recovery" - ) + info!("succeed to complete barrier during recovery") } } } @@ -545,11 +474,9 @@ struct BarrierEpochState { resps: Vec, - creating_jobs_to_wait: HashMap>, - - finished_table_ids: HashMap, + creating_jobs_to_wait: HashSet, - table_ids_to_commit: HashSet, + finished_jobs: HashMap)>, } impl BarrierEpochState { @@ -558,22 +485,17 @@ impl BarrierEpochState { } } -enum CompletingCommand { +enum CompletingTask { None, - GlobalStreamingGraph { - command_ctx: Arc, + Completing { + command_ctx: Option>, table_ids_to_finish: HashSet, - require_next_checkpoint: bool, + creating_job_epochs: Vec<(TableId, u64)>, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>>, - }, - CreatingStreamingJob { - table_id: TableId, - epoch: u64, - join_handle: JoinHandle>, + join_handle: JoinHandle>, }, #[expect(dead_code)] Err(MetaError), @@ -841,12 +763,8 @@ impl GlobalBarrierManager { } } (worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => { - match resp_result { - Ok(resp) => { - self.checkpoint_control.barrier_collected(resp); - - } - Err(e) => { + if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { + { let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id as _); if failed_command.is_some() || self.state.inflight_graph_info.contains_worker(worker_id as _) @@ -863,22 +781,15 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier() => { + complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { match complete_result { - Ok(Some(output)) => { - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if output.require_next_checkpoint { - assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } + Ok(output) => { if !output.table_ids_to_finish.is_empty() { self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); } } - Ok(None) => {} Err(e) => { self.failure_recovery(e).await; } @@ -986,19 +897,6 @@ impl GlobalBarrierManager { ); } - // may inject fake barrier - for creating_job in self - .checkpoint_control - .creating_streaming_job_controls - .values_mut() - { - creating_job.may_inject_fake_barrier( - &mut self.control_stream_manager, - prev_epoch.value().0, - checkpoint, - )? - } - self.pending_non_checkpoint_barriers .push(prev_epoch.value().0); let kind = if checkpoint { @@ -1017,8 +915,14 @@ impl GlobalBarrierManager { command = Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge); } - let (pre_applied_graph_info, pre_applied_subscription_info) = - self.state.apply_command(&command); + let command = command; + + let ( + pre_applied_graph_info, + pre_applied_subscription_info, + table_ids_to_commit, + jobs_to_wait, + ) = self.state.apply_command(&command); // Tracing related stuff prev_epoch.span().in_scope(|| { @@ -1026,14 +930,12 @@ impl GlobalBarrierManager { }); span.record("epoch", curr_epoch.value().0); - let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); - let command_ctx = Arc::new(CommandContext::new( self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), - table_ids_to_commit.clone(), + table_ids_to_commit, self.state.paused_reason(), command, kind, @@ -1043,18 +945,12 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let mut jobs_to_wait = HashSet::new(); - - for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls + for creating_job in &mut self + .checkpoint_control + .creating_streaming_job_controls + .values_mut() { - if let Some(wait_job) = - creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)? - { - jobs_to_wait.insert(*table_id); - if let Some(graph_to_finish) = wait_job { - self.state.inflight_graph_info.extend(graph_to_finish); - } - } + creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)?; } let node_to_collect = match self.control_stream_manager.inject_command_ctx_barrier( @@ -1085,7 +981,6 @@ impl GlobalBarrierManager { notifiers, node_to_collect, jobs_to_wait, - table_ids_to_commit, ); Ok(()) @@ -1150,140 +1045,86 @@ impl GlobalBarrierManager { } } +#[derive(Default)] +struct CompleteBarrierTask { + commit_info: CommitEpochInfo, + finished_jobs: Vec, + notifiers: Vec, + /// Some((`command_ctx`, `enqueue_time`)) + command_context: Option<(Arc, HistogramTimer)>, + table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, +} + impl GlobalBarrierManagerContext { - async fn complete_creating_job_barrier( - self, + fn collect_creating_job_commit_epoch_info( + commit_info: &mut CommitEpochInfo, epoch: u64, resps: Vec, - tables_to_commit: HashSet, + tables_to_commit: impl Iterator, is_first_time: bool, - ) -> MetaResult<()> { + ) { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - let new_table_fragment_info = if is_first_time { - NewTableFragmentInfo::NewCompactionGroup { - table_ids: tables_to_commit.clone(), - } - } else { - NewTableFragmentInfo::None - }; - let info = CommitEpochInfo { - sstables, - new_table_watermarks, - sst_to_context, - new_table_fragment_info, - change_log_delta: Default::default(), - committed_epoch: epoch, - tables_to_commit, + commit_info.sst_to_context.extend(sst_to_context); + commit_info.sstables.extend(sstables); + commit_info + .new_table_watermarks + .extend(new_table_watermarks); + let tables_to_commit: HashSet<_> = tables_to_commit.collect(); + tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + }); + if is_first_time { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo::NewCompactionGroup { + table_ids: tables_to_commit, + }); }; - self.hummock_manager.commit_epoch(info).await?; - Ok(()) } - async fn complete_barrier( - self, - node: EpochNode, - mut finished_jobs: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { - tracing::trace!( - prev_epoch = node.command_ctx.prev_epoch.value().0, - kind = ?node.command_ctx.kind, - "complete barrier" - ); - let EpochNode { - command_ctx, - notifiers, - enqueue_time, - state, - .. - } = node; - assert!(state.node_to_collect.is_empty()); - assert!(state.creating_jobs_to_wait.is_empty()); - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - if !state.finished_table_ids.is_empty() { - assert!(command_ctx.kind.is_checkpoint()); - } - finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { - TrackingJob::New(TrackingCommand { - info, - replace_table_info: None, - }) - })); - - let result = self - .update_snapshot( - &command_ctx, - state.table_ids_to_commit, - state.resps, - backfill_pinned_log_epoch, - ) - .await; - - let version_stats = match result { - Ok(version_stats) => version_stats, - Err(e) => { - for notifier in notifiers { - notifier.notify_collection_failed(e.clone()); - } - return Err(e); + async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { + let result: MetaResult<()> = try { + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + self.hummock_manager.commit_epoch(task.commit_info).await?; + if let Some((command_ctx, _)) = &task.command_context { + command_ctx.post_collect().await?; } + + wait_commit_timer.observe_duration(); }; - notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - try_join_all(finished_jobs.into_iter().map(|finished_job| { - let metadata_manager = &self.metadata_manager; - async move { finished_job.finish(metadata_manager).await } - })) - .await?; - let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - wait_commit_timer.observe_duration(); - self.metrics - .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); - Ok(version_stats) - } - async fn update_snapshot( - &self, - command_ctx: &CommandContext, - tables_to_commit: HashSet, - resps: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { { - { - match &command_ctx.kind { - BarrierKind::Initial => {} - BarrierKind::Checkpoint(epochs) => { - let commit_info = collect_commit_epoch_info( - resps, - command_ctx, - epochs, - backfill_pinned_log_epoch, - tables_to_commit, - ); - self.hummock_manager.commit_epoch(commit_info).await?; - } - BarrierKind::Barrier => { - // if we collect a barrier(checkpoint = false), - // we need to ensure that command is Plain and the notifier's checkpoint is - // false - assert!(!command_ctx.command.need_checkpoint()); - } + if let Err(e) = result { + for notifier in task.notifiers { + notifier.notify_collection_failed(e.clone()); } - - command_ctx.post_collect().await?; - Ok(if command_ctx.kind.is_checkpoint() { - Some(self.hummock_manager.get_version_stats().await) - } else { - None - }) + return Err(e); + } + task.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + try_join_all( + task.finished_jobs + .into_iter() + .map(|finished_job| finished_job.finish(&self.metadata_manager)), + ) + .await?; + if let Some((command_ctx, enqueue_time)) = task.command_context { + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + self.metrics + .last_committed_barrier_time + .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); } } + + Ok(self.hummock_manager.get_version_stats().await) } pub fn hummock_manager(&self) -> &HummockManagerRef { @@ -1350,8 +1191,6 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - command_ctx: Arc, - require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1381,84 +1220,167 @@ impl CheckpointControl { .collect() } - pub(super) async fn next_completed_barrier( + fn next_complete_barrier_task( &mut self, - ) -> MetaResult> { - if matches!(&self.completing_command, CompletingCommand::None) { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + mut scheduled_barriers: Option<&mut ScheduledBarriers>, + ) -> Option { + // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough + let mut creating_jobs_task = vec![]; + { + // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough + let mut finished_jobs = Vec::new(); + let min_upstream_inflight_barrier = self + .command_ctx_queue + .first_key_value() + .map(|(epoch, _)| *epoch); + for (table_id, job) in &mut self.creating_streaming_job_controls { + if let Some((epoch, resps, status)) = + job.start_completing(min_upstream_inflight_barrier) + { + let is_first_time = match status { + CompleteJobType::First => true, + CompleteJobType::Normal => false, + CompleteJobType::Finished => { + finished_jobs.push((*table_id, epoch, resps)); + continue; + } + }; + creating_jobs_task.push((*table_id, epoch, resps, is_first_time)); + } + } + for (table_id, epoch, resps) in finished_jobs { + let epoch_state = &mut self + .command_ctx_queue + .get_mut(&epoch) + .expect("should exist") + .state; + assert!(epoch_state.creating_jobs_to_wait.remove(&table_id)); + debug!(epoch, ?table_id, "finish creating job"); + // It's safe to remove the creating job, because on CompleteJobType::Finished, + // all previous barriers have been collected and completed. + let creating_streaming_job = self + .creating_streaming_job_controls + .remove(&table_id) + .expect("should exist"); + assert!(creating_streaming_job.is_finished()); + assert!(epoch_state + .finished_jobs + .insert(table_id, (creating_streaming_job.info, resps)) + .is_none()); + } + } + let mut task = None; + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); - let finished_jobs = self + assert!(node.state.node_to_collect.is_empty()); + let mut finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - let command_ctx = node.command_ctx.clone(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - node, - finished_jobs, - self.collect_backfill_pinned_upstream_log_epoch(), - )); - let require_next_checkpoint = - if self.create_mview_tracker.has_pending_finished_jobs() { - self.command_ctx_queue + if !node.command_ctx.kind.is_checkpoint() { + assert!(finished_jobs.is_empty()); + node.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + if let Some(scheduled_barriers) = &mut scheduled_barriers + && self.create_mview_tracker.has_pending_finished_jobs() + && self + .command_ctx_queue .values() .all(|node| !node.command_ctx.kind.is_checkpoint()) - } else { - false - }; - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - require_next_checkpoint, - join_handle, - table_ids_to_finish, - }; - } else { - for (table_id, job) in &mut self.creating_streaming_job_controls { - let (upstream_epochs_to_notify, commit_info) = job.start_completing(); - for upstream_epoch in upstream_epochs_to_notify { - let wait_progress_timer = self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .creating_jobs_to_wait - .remove(table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - } - if let Some((epoch, resps, is_first_time)) = commit_info { - let tables_to_commit = job - .info - .table_fragments - .all_table_ids() - .map(TableId::new) - .collect(); - let join_handle = - tokio::spawn(self.context.clone().complete_creating_job_barrier( - epoch, - resps, - tables_to_commit, - is_first_time, - )); - self.completing_command = CompletingCommand::CreatingStreamingJob { - table_id: *table_id, - epoch, - join_handle, - }; - break; + { + scheduled_barriers.force_checkpoint_in_next_barrier(); } + continue; } + let table_ids_to_finish = node + .state + .finished_jobs + .drain() + .map(|(table_id, (info, resps))| { + node.state.resps.extend(resps); + finished_jobs.push(TrackingJob::New(TrackingCommand { + info, + replace_table_info: None, + })); + table_id + }) + .collect(); + let commit_info = collect_commit_epoch_info( + take(&mut node.state.resps), + &node.command_ctx, + self.collect_backfill_pinned_upstream_log_epoch(), + ); + task = Some(CompleteBarrierTask { + commit_info, + finished_jobs, + notifiers: node.notifiers, + command_context: Some((node.command_ctx, node.enqueue_time)), + table_ids_to_finish, + creating_job_epochs: vec![], + }); + break; } } + if !creating_jobs_task.is_empty() { + let task = task.get_or_insert_default(); + for (table_id, epoch, resps, is_first_time) in creating_jobs_task { + GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + &mut task.commit_info, + epoch, + resps, + self.creating_streaming_job_controls[&table_id] + .info + .table_fragments + .all_table_ids() + .map(TableId::new), + is_first_time, + ); + task.creating_job_epochs.push((table_id, epoch)); + } + } + task + } + + pub(super) fn next_completed_barrier<'a>( + &'a mut self, + scheduled_barriers: &mut ScheduledBarriers, + ) -> impl Future> + 'a { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let CompletingTask::None = &self.completing_task { + if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) { + { + let command_ctx = task + .command_context + .as_ref() + .map(|(command_ctx, _)| command_ctx.clone()); + let table_ids_to_finish = task.table_ids_to_finish.clone(); + let creating_job_epochs = task.creating_job_epochs.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); + self.completing_task = CompletingTask::Completing { + command_ctx, + join_handle, + table_ids_to_finish, + creating_job_epochs, + }; + } + } + } + + self.next_completed_barrier_inner() + } + + async fn next_completed_barrier_inner(&mut self) -> MetaResult { + let CompletingTask::Completing { join_handle, .. } = &mut self.completing_task else { + return pending().await; + }; - match &mut self.completing_command { - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + let (table_ids_to_finish, creating_job_epochs) = { + { let join_result: MetaResult<_> = try { join_handle .await @@ -1467,85 +1389,33 @@ impl CheckpointControl { // It's important to reset the completing_command after await no matter the result is err // or not, and otherwise the join handle will be polled again after ready. let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) + CompletingTask::Err(e.clone()) } else { - CompletingCommand::None + CompletingTask::None }; let completed_command = - replace(&mut self.completing_command, next_completing_command_status); - join_result.map(move | version_stats| { - if let Some(new_version_stats) = version_stats { - self.hummock_version_stats = new_version_stats; - } - must_match!( - completed_command, - CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { - Some(BarrierCompleteOutput { - command_ctx, - require_next_checkpoint, - table_ids_to_finish, - }) - } - ) - }) + replace(&mut self.completing_task, next_completing_command_status); + self.hummock_version_stats = join_result?; + + must_match!(completed_command, CompletingTask::Completing { + table_ids_to_finish, + creating_job_epochs, + .. + } => (table_ids_to_finish, creating_job_epochs)) } - CompletingCommand::CreatingStreamingJob { - table_id, - epoch, - join_handle, - } => { - let table_id = *table_id; - let epoch = *epoch; - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - self.completing_command = next_completing_command_status; - if let Some((upstream_epoch, is_finished)) = self - .creating_streaming_job_controls + }; + + { + for (table_id, epoch) in creating_job_epochs { + self.creating_streaming_job_controls .get_mut(&table_id) .expect("should exist") .ack_completed(epoch) - { - let wait_progress_timer = self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .creating_jobs_to_wait - .remove(&table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - if is_finished { - debug!(epoch, ?table_id, "finish creating job"); - let creating_streaming_job = self - .creating_streaming_job_controls - .remove(&table_id) - .expect("should exist"); - assert!(creating_streaming_job.is_finished()); - assert!(self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .finished_table_ids - .insert(table_id, creating_streaming_job.info) - .is_none()); - } - } - join_result.map(|_| None) } - CompletingCommand::None | CompletingCommand::Err(_) => pending().await, + + Ok(BarrierCompleteOutput { + table_ids_to_finish, + }) } } } @@ -1682,28 +1552,26 @@ fn collect_resp_info( fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, - epochs: &Vec, backfill_pinned_log_epoch: HashMap)>, - tables_to_commit: HashSet, ) -> CommitEpochInfo { let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); - let new_table_fragment_info = if let Command::CreateStreamingJob { info, job_type } = + let new_table_fragment_infos = if let Command::CreateStreamingJob { info, job_type } = &command_ctx.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { let table_fragments = &info.table_fragments; - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: table_fragments.mv_table_id().map(TableId::new), internal_table_ids: table_fragments .internal_table_ids() .into_iter() .map(TableId::new) .collect(), - } + }] } else { - NewTableFragmentInfo::None + vec![] }; let mut mv_log_store_truncate_epoch = HashMap::new(); @@ -1739,19 +1607,23 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - epochs, + must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), ); let epoch = command_ctx.prev_epoch.value().0; + let tables_to_commit = command_ctx + .table_ids_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); CommitEpochInfo { sstables: synced_ssts, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_new_change_log, - committed_epoch: epoch, tables_to_commit, } } diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index db2ded5629d7a..d9fe6f13c963c 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::PausedReason; @@ -83,10 +86,17 @@ impl BarrierManagerState { /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors /// will be removed from the state after the info get resolved. + /// + /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`) pub fn apply_command( &mut self, command: &Command, - ) -> (InflightGraphInfo, InflightSubscriptionInfo) { + ) -> ( + InflightGraphInfo, + InflightSubscriptionInfo, + HashSet, + HashSet, + ) { // update the fragment_infos outside pre_apply let fragment_changes = if let Command::CreateStreamingJob { job_type: CreateStreamingJobType::SnapshotBackfill(_), @@ -108,8 +118,19 @@ impl BarrierManagerState { if let Some(fragment_changes) = fragment_changes { self.inflight_graph_info.post_apply(&fragment_changes); } + + let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect(); + let mut jobs_to_wait = HashSet::new(); + if let Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) = command { + for (table_id, (_, graph_info)) in jobs_to_merge { + jobs_to_wait.insert(*table_id); + table_ids_to_commit.extend(graph_info.existing_table_ids()); + self.inflight_graph_info.extend(graph_info.clone()); + } + } + self.inflight_subscription_info.post_apply(command); - (info, subscription_info) + (info, subscription_info, table_ids_to_commit, jobs_to_wait) } } diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index f678014d440c8..ea747dbf402e5 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - object_size_map, summarize_group_deltas, -}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; use risingwave_pb::hummock::{ @@ -156,13 +154,27 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for (group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *group_id); + for group_deltas in version_delta.group_deltas.values() { object_sizes.extend( - summary - .insert_table_infos + group_deltas + .group_deltas .iter() - .map(|t| (t.object_id, t.file_size)) + .flat_map(|delta| { + match delta { + GroupDeltaCommon::IntraLevel(level_delta) => { + Some(level_delta.inserted_table_infos.iter()) + } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + Some(inserted_table_infos.iter()) + } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, + } + .into_iter() + .flatten() + .map(|t| (t.object_id, t.file_size)) + }) .chain( version_delta .change_log_delta diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index cb353d32e0890..da97a29c06f8c 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -28,6 +29,7 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::hummock::compact_task::{self}; +use risingwave_pb::hummock::CompactionConfig; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -47,7 +49,6 @@ use crate::hummock::{ }; pub enum NewTableFragmentInfo { - None, Normal { mv_table_id: Option, internal_table_ids: Vec, @@ -57,14 +58,15 @@ pub enum NewTableFragmentInfo { }, } +#[derive(Default)] pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, - pub new_table_fragment_info: NewTableFragmentInfo, + pub new_table_fragment_infos: Vec, pub change_log_delta: HashMap, - pub committed_epoch: u64, - pub tables_to_commit: HashSet, + /// `table_id` -> `committed_epoch` + pub tables_to_commit: HashMap, } impl HummockManager { @@ -75,9 +77,8 @@ impl HummockManager { mut sstables, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta, - committed_epoch, tables_to_commit, } = commit_info; let mut versioning_guard = self.versioning.write().await; @@ -91,7 +92,6 @@ impl HummockManager { let versioning: &mut Versioning = &mut versioning_guard; self.commit_epoch_sanity_check( - committed_epoch, &tables_to_commit, &sstables, &sst_to_context, @@ -124,15 +124,18 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let mut new_table_ids = HashMap::new(); + let mut new_compaction_groups = HashMap::new(); + let mut compaction_group_manager_txn = None; + let mut compaction_group_config: Option> = None; // Add new table - let (new_table_ids, new_compaction_group, compaction_group_manager_txn) = + for new_table_fragment_info in new_table_fragment_infos { match new_table_fragment_info { NewTableFragmentInfo::Normal { mv_table_id, internal_table_ids, } => { - let mut new_table_ids = HashMap::new(); on_handle_add_new_table( state_table_info, &internal_table_ids, @@ -148,24 +151,40 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - (new_table_ids, None, None) } NewTableFragmentInfo::NewCompactionGroup { table_ids } => { - let compaction_group_manager_guard = - self.compaction_group_manager.write().await; - let compaction_group_config = - compaction_group_manager_guard.default_compaction_config(); - let mut compaction_group_manager = - CompactionGroupManager::start_owned_compaction_groups_txn( - compaction_group_manager_guard, - ); - let mut new_table_ids = HashMap::new(); + let (compaction_group_manager, compaction_group_config) = + if let Some(compaction_group_manager) = &mut compaction_group_manager_txn { + ( + compaction_group_manager, + (*compaction_group_config + .as_ref() + .expect("must be set with compaction_group_manager_txn")) + .clone(), + ) + } else { + let compaction_group_manager_guard = + self.compaction_group_manager.write().await; + let new_compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + compaction_group_config = Some(new_compaction_group_config.clone()); + ( + compaction_group_manager_txn.insert( + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ), + ), + new_compaction_group_config, + ) + }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + new_compaction_groups + .insert(new_compaction_group_id, compaction_group_config.clone()); compaction_group_manager.insert( new_compaction_group_id, CompactionGroup { group_id: new_compaction_group_id, - compaction_config: compaction_group_config.clone(), + compaction_config: compaction_group_config, }, ); @@ -176,14 +195,9 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - ( - new_table_ids, - Some((new_compaction_group_id, (*compaction_group_config).clone())), - Some(compaction_group_manager), - ) } - NewTableFragmentInfo::None => (HashMap::new(), None, None), - }; + } + } let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) @@ -192,9 +206,8 @@ impl HummockManager { let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); let time_travel_delta = version.pre_commit_epoch( - committed_epoch, &tables_to_commit, - new_compaction_group, + new_compaction_groups, commit_sstables, &new_table_ids, new_table_watermarks, @@ -251,9 +264,14 @@ impl HummockManager { .values() .map(|g| (g.group_id, g.parent_group_id)) .collect(); - let time_travel_tables_to_commit = table_compaction_group_mapping - .iter() - .filter(|(table_id, _)| tables_to_commit.contains(table_id)); + let time_travel_tables_to_commit = + table_compaction_group_mapping + .iter() + .filter_map(|(table_id, cg_id)| { + tables_to_commit + .get(table_id) + .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) + }); let mut txn = self.env.meta_store_ref().conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( @@ -263,7 +281,6 @@ impl HummockManager { &group_parents, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, - committed_epoch, ) .await?; commit_multi_var_with_provided_txn!( diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index 09d6e6badea52..faabbb80427b3 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -41,7 +41,6 @@ use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask}; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::{InputLevel, Level, Levels}; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -153,17 +152,15 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compact_task.compaction_group_id) .or_default() .group_deltas; - let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); + let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); for level in &compact_task.input_ssts { let level_idx = level.level_idx; - let mut removed_table_ids = - level.table_infos.iter().map(|sst| sst.sst_id).collect_vec(); removed_table_ids_map .entry(level_idx) .or_default() - .append(&mut removed_table_ids); + .extend(level.table_infos.iter().map(|sst| sst.sst_id)); } for (level_idx, removed_table_ids) in removed_table_ids_map { @@ -181,7 +178,7 @@ impl<'a> HummockVersionTransaction<'a> { let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( compact_task.target_level, compact_task.target_sub_level_id, - vec![], // default + HashSet::new(), // default compact_task.sorted_output_ssts.clone(), compact_task.split_weight_by_vnode, )); diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index b76bd47c49b74..0b66b1dded8d0 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, + HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask}; @@ -189,8 +189,7 @@ impl HummockManager { pub async fn commit_epoch_sanity_check( &self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, + tables_to_commit: &HashMap, sstables: &[LocalSstableInfo], sst_to_context: &HashMap, current_version: &HummockVersion, @@ -216,9 +215,9 @@ impl HummockManager { } // sanity check on monotonically increasing table committed epoch - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if let Some(info) = current_version.state_table_info.info().get(table_id) { - if committed_epoch <= info.committed_epoch { + if *committed_epoch <= info.committed_epoch { return Err(anyhow::anyhow!( "table {} Epoch {} <= committed_epoch {}", table_id, @@ -265,7 +264,6 @@ impl HummockManager { .send_event(ResponseEvent::ValidationTask(ValidationTask { sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(), sst_id_to_worker_id: sst_to_context.clone(), - epoch: committed_epoch, })) .is_err() { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index c8df0c93b85ec..ed9a2676e445f 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -359,8 +359,7 @@ impl HummockManager { delta: HummockVersionDelta, group_parents: &HashMap, skip_sst_ids: &HashSet, - tables_to_commit: impl Iterator, - committed_epoch: u64, + tables_to_commit: impl Iterator, ) -> Result>> { let select_groups = group_parents .iter() @@ -397,7 +396,7 @@ impl HummockManager { Ok(count) } - for (table_id, cg_id) in tables_to_commit { + for (table_id, cg_id, committed_epoch) in tables_to_commit { if !select_groups.contains(cg_id) { continue; } @@ -449,7 +448,7 @@ impl HummockManager { } let written = write_sstable_infos( delta - .newly_added_sst_infos(&select_groups) + .newly_added_sst_infos(Some(&select_groups)) .filter(|s| !skip_sst_ids.contains(&s.sst_id)), txn, ) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..57a228f35805f 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -12,20 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; -use risingwave_hummock_sdk::version::{ - GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, -}; -use risingwave_hummock_sdk::{ - CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, -}; +use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, StateTableInfoDelta, @@ -113,9 +110,8 @@ impl<'a> HummockVersionTransaction<'a> { /// Returns a duplicate delta, used by time travel. pub(super) fn pre_commit_epoch( &mut self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, - new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, + tables_to_commit: &HashMap, + new_compaction_groups: HashMap>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, @@ -125,7 +121,7 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - if let Some((compaction_group_id, compaction_group_config)) = new_compaction_group { + for (compaction_group_id, compaction_group_config) in new_compaction_groups { { let group_deltas = &mut new_version_delta .group_deltas @@ -135,7 +131,7 @@ impl<'a> HummockVersionTransaction<'a> { #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some(compaction_group_config.clone()), + group_config: Some((*compaction_group_config).clone()), group_id: compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, @@ -154,13 +150,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( - 0, - 0, // l0_sub_level_id will be generated during apply_version_delta - vec![], // default - inserted_table_infos, - 0, // default - )); + let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); group_deltas.push(group_delta); } @@ -173,6 +163,7 @@ impl<'a> HummockVersionTransaction<'a> { "newly added table exists previously: {:?}", table_id ); + let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit"); delta.state_table_info_delta.insert( *table_id, StateTableInfoDelta { @@ -182,7 +173,7 @@ impl<'a> HummockVersionTransaction<'a> { ); } - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if new_table_ids.contains_key(table_id) { continue; } @@ -194,7 +185,7 @@ impl<'a> HummockVersionTransaction<'a> { .insert( *table_id, StateTableInfoDelta { - committed_epoch, + committed_epoch: *committed_epoch, compaction_group_id: info.compaction_group_id, } ) diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 54b26fa20a665..805db163587a0 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -171,20 +171,20 @@ impl HummockMetaClient for MockHummockMetaClient { .chain(table_ids.iter().cloned()) .collect::>(); - let new_table_fragment_info = if commit_table_ids + let new_table_fragment_infos = if commit_table_ids .iter() .all(|table_id| table_ids.contains(table_id)) { - NewTableFragmentInfo::None + vec![] } else { - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: commit_table_ids .iter() .cloned() .map(TableId::from) .collect_vec(), - } + }] }; let sst_to_context = sync_result @@ -215,13 +215,12 @@ impl HummockMetaClient for MockHummockMetaClient { sstables: sync_result.uncommitted_ssts, new_table_watermarks: new_table_watermark, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_change_log, - committed_epoch: epoch, tables_to_commit: commit_table_ids .iter() .cloned() - .map(TableId::from) + .map(|table_id| (TableId::new(table_id), epoch)) .collect(), }) .await diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 88dc47d1f30e8..69d9dc21a075a 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -31,6 +31,7 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] +#![feature(option_get_or_insert_default)] pub mod backup_restore; pub mod barrier; diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index ccceddfb0799c..e9fa97b1f87a0 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -83,9 +83,6 @@ pub struct MetaMetrics { pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec<2>, // (table_id, barrier_type) /// The latency of commit epoch of `table_id` pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec<1>, // (table_id, ) - /// The latency that the upstream waits on the snapshot backfill progress after the upstream - /// has collected the barrier. - pub snapshot_backfill_upstream_wait_progress_latency: LabelGuardedHistogramVec<1>, /* (table_id, ) */ /// The lags between the upstream epoch and the downstream epoch. pub snapshot_backfill_lag: LabelGuardedIntGaugeVec<1>, // (table_id, ) /// The number of inflight barriers of `table_id` @@ -282,13 +279,7 @@ impl MetaMetrics { ); let snapshot_backfill_wait_commit_latency = register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let opts = histogram_opts!( - "meta_snapshot_backfill_upstream_wait_progress_latency", - "snapshot backfill upstream_wait_progress_latency", - exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s - ); - let snapshot_backfill_upstream_wait_progress_latency = - register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!( "meta_snapshot_backfill_upstream_lag", "snapshot backfill upstream_lag", @@ -759,7 +750,6 @@ impl MetaMetrics { last_committed_barrier_time, snapshot_backfill_barrier_latency, snapshot_backfill_wait_commit_latency, - snapshot_backfill_upstream_wait_progress_latency, snapshot_backfill_lag, snapshot_backfill_inflight_barrier_num, recovery_failure_cnt, diff --git a/src/prost/build.rs b/src/prost/build.rs index ee04705ef19e5..0e1b2ea5c1db6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -183,6 +183,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") + .type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index 162895a38ac91..82a9b1904d5af 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -326,7 +326,6 @@ impl From<&CompactTask> for PbCompactTask { pub struct ValidationTask { pub sst_infos: Vec, pub sst_id_to_worker_id: HashMap, - pub epoch: u64, } impl From for ValidationTask { @@ -338,7 +337,6 @@ impl From for ValidationTask { .map(SstableInfo::from) .collect_vec(), sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.clone(), - epoch: pb_validation_task.epoch, } } } @@ -352,7 +350,6 @@ impl From for PbValidationTask { .map(|sst| sst.into()) .collect_vec(), sst_id_to_worker_id: validation_task.sst_id_to_worker_id.clone(), - epoch: validation_task.epoch, } } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0ffdd15eca498..1af5c42ac7d4a 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,8 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType, - StateTableInfo, StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -36,83 +35,11 @@ use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltas, HummockVersion, HummockVersionCommon, HummockVersionDelta, - HummockVersionStateTableInfo, IntraLevelDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub struct GroupDeltasSummary { - pub delete_sst_levels: Vec, - pub delete_sst_ids_set: HashSet, - pub insert_sst_level_id: u32, - pub insert_sub_level_id: u64, - pub insert_table_infos: Vec, - pub group_construct: Option, - pub group_destroy: Option, - pub new_vnode_partition_count: u32, - pub group_merge: Option, -} - -pub fn summarize_group_deltas( - group_deltas: &GroupDeltas, - compaction_group_id: CompactionGroupId, -) -> GroupDeltasSummary { - let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); - let mut delete_sst_ids_set = HashSet::new(); - let mut insert_sst_level_id = u32::MAX; - let mut insert_sub_level_id = u64::MAX; - let mut insert_table_infos = vec![]; - let mut group_construct = None; - let mut group_destroy = None; - let mut new_vnode_partition_count = 0; - let mut group_merge = None; - - for group_delta in &group_deltas.group_deltas { - match group_delta { - GroupDelta::IntraLevel(intra_level) => { - if !intra_level.removed_table_ids.is_empty() { - delete_sst_levels.push(intra_level.level_idx); - delete_sst_ids_set.extend(intra_level.removed_table_ids.iter().clone()); - } - if !intra_level.inserted_table_infos.is_empty() { - insert_sst_level_id = intra_level.level_idx; - insert_sub_level_id = intra_level.l0_sub_level_id; - insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); - } - new_vnode_partition_count = intra_level.vnode_partition_count; - } - GroupDelta::GroupConstruct(construct_delta) => { - assert!(group_construct.is_none()); - group_construct = Some(construct_delta.clone()); - } - GroupDelta::GroupDestroy(_) => { - assert!(group_destroy.is_none()); - group_destroy = Some(compaction_group_id); - } - GroupDelta::GroupMerge(merge_delta) => { - assert!(group_merge.is_none()); - group_merge = Some(*merge_delta); - group_destroy = Some(merge_delta.right_group_id); - } - } - } - - delete_sst_levels.sort(); - delete_sst_levels.dedup(); - - GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - group_construct, - group_destroy, - new_vnode_partition_count, - group_merge, - } -} - #[derive(Clone, Default)] pub struct TableGroupInfo { pub group_id: CompactionGroupId, @@ -493,11 +420,12 @@ impl HummockVersion { let mut removed_ssts: BTreeMap> = BTreeMap::new(); // Build only if all deltas are intra level deltas. - if !group_deltas - .group_deltas - .iter() - .all(|delta| matches!(delta, GroupDelta::IntraLevel(_))) - { + if !group_deltas.group_deltas.iter().all(|delta| { + matches!( + delta, + GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_) + ) + }) { continue; } @@ -505,24 +433,36 @@ impl HummockVersion { // current `hummock::manager::gen_version_delta` implementation. Better refactor the // struct to reduce conventions. for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(intra_level) = group_delta { - if !intra_level.inserted_table_infos.is_empty() { - info.insert_sst_level = intra_level.level_idx; - info.insert_sst_infos - .extend(intra_level.inserted_table_infos.iter().cloned()); + match group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + if !inserted_table_infos.is_empty() { + info.insert_sst_level = 0; + info.insert_sst_infos + .extend(inserted_table_infos.iter().cloned()); + } } - if !intra_level.removed_table_ids.is_empty() { - for id in &intra_level.removed_table_ids { - if intra_level.level_idx == 0 { - removed_l0_ssts.insert(*id); - } else { - removed_ssts - .entry(intra_level.level_idx) - .or_default() - .insert(*id); + GroupDeltaCommon::IntraLevel(intra_level) => { + if !intra_level.inserted_table_infos.is_empty() { + info.insert_sst_level = intra_level.level_idx; + info.insert_sst_infos + .extend(intra_level.inserted_table_infos.iter().cloned()); + } + if !intra_level.removed_table_ids.is_empty() { + for id in &intra_level.removed_table_ids { + if intra_level.level_idx == 0 { + removed_l0_ssts.insert(*id); + } else { + removed_ssts + .entry(intra_level.level_idx) + .or_default() + .insert(*id); + } } } } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => {} } } @@ -587,97 +527,129 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *compaction_group_id); - if let Some(group_construct) = &summary.group_construct { - let mut new_levels = build_initial_compaction_group_levels( - *compaction_group_id, - group_construct.get_group_config().unwrap(), - ); - let parent_group_id = group_construct.parent_group_id; - new_levels.parent_group_id = parent_group_id; - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - new_levels - .member_table_ids - .clone_from(&group_construct.table_ids); - self.levels.insert(*compaction_group_id, new_levels); - let member_table_ids = - if group_construct.version >= CompatibilityVersion::NoMemberTableIds as _ { - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id) - .iter() - .map(|table_id| table_id.table_id) - .collect() - } else { + let mut is_applied_l0_compact = false; + for group_delta in &group_deltas.group_deltas { + match group_delta { + GroupDeltaCommon::GroupConstruct(group_construct) => { + let mut new_levels = build_initial_compaction_group_levels( + *compaction_group_id, + group_construct.get_group_config().unwrap(), + ); + let parent_group_id = group_construct.parent_group_id; + new_levels.parent_group_id = parent_group_id; #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - BTreeSet::from_iter(group_construct.table_ids.clone()) - }; - - if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ { - let split_key = if group_construct.split_key.is_some() { - Some(Bytes::from(group_construct.split_key.clone().unwrap())) - } else { - None - }; - self.init_with_parent_group_v2( - parent_group_id, - *compaction_group_id, - group_construct.get_new_sst_start_id(), - split_key.clone(), - ); - } else { - // for backward-compatibility of previous hummock version delta - self.init_with_parent_group( - parent_group_id, - *compaction_group_id, - member_table_ids, - group_construct.get_new_sst_start_id(), - ); - } - } else if let Some(group_merge) = &summary.group_merge { - tracing::info!( - "group_merge left {:?} right {:?}", - group_merge.left_group_id, - group_merge.right_group_id - ); - self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) - } - let group_destroy = summary.group_destroy; - let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { - panic!("compaction group {} does not exist", compaction_group_id) - }); + new_levels + .member_table_ids + .clone_from(&group_construct.table_ids); + self.levels.insert(*compaction_group_id, new_levels); + let member_table_ids = if group_construct.version + >= CompatibilityVersion::NoMemberTableIds as _ + { + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect() + } else { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta + BTreeSet::from_iter(group_construct.table_ids.clone()) + }; + + if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ + { + let split_key = if group_construct.split_key.is_some() { + Some(Bytes::from(group_construct.split_key.clone().unwrap())) + } else { + None + }; + self.init_with_parent_group_v2( + parent_group_id, + *compaction_group_id, + group_construct.get_new_sst_start_id(), + split_key.clone(), + ); + } else { + // for backward-compatibility of previous hummock version delta + self.init_with_parent_group( + parent_group_id, + *compaction_group_id, + member_table_ids, + group_construct.get_new_sst_start_id(), + ); + } + } + GroupDeltaCommon::GroupMerge(group_merge) => { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id + ); + self.merge_compaction_group( + group_merge.left_group_id, + group_merge.right_group_id, + ) + } + GroupDeltaCommon::IntraLevel(level_delta) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + if is_commit_epoch { + assert!( + level_delta.removed_table_ids.is_empty(), + "no sst should be deleted when committing an epoch" + ); - if is_commit_epoch { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - .. - } = summary; + let IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + } = level_delta; + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + &mut levels.l0, + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } + } else { + // The delta is caused by compaction. + levels.apply_compact_ssts( + level_delta, + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id), + ); + if level_delta.level_idx == 0 { + is_applied_l0_compact = true; + } + } + } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + assert!(is_commit_epoch); - assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() - || group_destroy.is_some(), - "no sst should be deleted when committing an epoch" - ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); - for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(IntraLevelDelta { - level_idx, - inserted_table_infos, - .. - }) = group_delta - { - assert_eq!( - *level_idx, 0, - "we should only add to L0 when we commit an epoch." - ); if !inserted_table_infos.is_empty() { + let next_l0_sub_level_id = levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + .unwrap_or(1); + insert_new_sub_level( &mut levels.l0, next_l0_sub_level_id, @@ -685,20 +657,16 @@ impl HummockVersion { inserted_table_infos.clone(), None, ); - next_l0_sub_level_id += 1; } } + GroupDeltaCommon::GroupDestroy(_) => { + self.levels.remove(compaction_group_id); + } } - } else { - // The delta is caused by compaction. - levels.apply_compact_ssts( - summary, - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id), - ); } - if let Some(destroy_group_id) = &group_destroy { - self.levels.remove(destroy_group_id); + if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id) + { + levels.post_apply_l0_compact(); } } self.id = version_delta.id; @@ -1005,54 +973,53 @@ impl HummockVersionCommon { } } -#[easy_ext::ext(HummockLevelsExt)] impl Levels { - pub fn apply_compact_ssts( + pub(crate) fn apply_compact_ssts( &mut self, - summary: GroupDeltasSummary, + level_delta: &IntraLevelDeltaCommon, member_table_ids: &BTreeSet, ) { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - new_vnode_partition_count, + let IntraLevelDeltaCommon { + level_idx, + l0_sub_level_id, + inserted_table_infos: insert_table_infos, + vnode_partition_count, + removed_table_ids: delete_sst_ids_set, .. - } = summary; + } = level_delta; + let new_vnode_partition_count = *vnode_partition_count; - if !self.check_deleted_sst_exist(&delete_sst_levels, delete_sst_ids_set.clone()) { + if !self.check_deleted_sst_exist(&[*level_idx], delete_sst_ids_set.clone()) { warn!( "This VersionDelta may be committed by an expired compact task. Please check it. \n - delete_sst_levels: {:?}\n, - delete_sst_ids_set: {:?}\n, insert_sst_level_id: {}\n, insert_sub_level_id: {}\n, - insert_table_infos: {:?}\n", - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, + insert_table_infos: {:?}\n, + delete_sst_ids_set: {:?}\n", + level_idx, + l0_sub_level_id, insert_table_infos .iter() .map(|sst| (sst.sst_id, sst.object_id)) - .collect_vec() + .collect_vec(), + delete_sst_ids_set, ); return; } - for level_idx in &delete_sst_levels { + if !delete_sst_ids_set.is_empty() { if *level_idx == 0 { for level in &mut self.l0.sub_levels { - level_delete_ssts(level, &delete_sst_ids_set); + level_delete_ssts(level, delete_sst_ids_set); } } else { let idx = *level_idx as usize - 1; - level_delete_ssts(&mut self.levels[idx], &delete_sst_ids_set); + level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set); } } if !insert_table_infos.is_empty() { + let insert_sst_level_id = *level_idx; + let insert_sub_level_id = *l0_sub_level_id; if insert_sst_level_id == 0 { let l0 = &mut self.l0; let index = l0 @@ -1093,7 +1060,10 @@ impl Levels { level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } - if delete_sst_levels.iter().any(|level_id| *level_id == 0) { + } + + pub(crate) fn post_apply_l0_compact(&mut self) { + { self.l0 .sub_levels .retain(|level| !level.table_infos.is_empty()); @@ -1358,7 +1328,7 @@ fn level_delete_ssts( original_len != operand.table_infos.len() } -fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) { +fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec) { operand.total_file_size += insert_table_infos .iter() .map(|sst| sst.sst_size) @@ -1367,7 +1337,9 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) .iter() .map(|sst| sst.uncompressed_file_size) .sum::(); - operand.table_infos.extend(insert_table_infos); + operand + .table_infos + .extend(insert_table_infos.iter().cloned()); operand .table_infos .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); @@ -1501,7 +1473,7 @@ pub fn validate_version(version: &HummockVersion) -> Vec { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use bytes::Bytes; use risingwave_common::catalog::TableId; @@ -1655,7 +1627,7 @@ mod tests { group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( 1, 0, - vec![], + HashSet::new(), vec![SstableInfo { object_id: 1, sst_id: 1, diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 324e8a91cf4a3..bbc0ae22148c0 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -259,7 +259,7 @@ impl TableWatermarksIndex { } } if self.latest_epoch < committed_epoch { - warn!( + debug!( latest_epoch = self.latest_epoch, committed_epoch, "committed_epoch exceed table watermark latest_epoch" ); diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 64206d9b45b55..b106563cdc7ac 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -21,12 +21,12 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; -use risingwave_pb::hummock::group_delta::PbDeltaType; +use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType}; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, - PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, - StateTableInfo, StateTableInfoDelta, + PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo, + PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -512,76 +512,45 @@ impl HummockVersionDelta { /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - self.group_deltas - .values() - .flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter().map(|sst| sst.object_id) - }) - }) - .chain(self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log - .new_value - .iter() - .map(|sst| sst.object_id) - .chain(new_log.old_value.iter().map(|sst| sst.object_id)) - })) + self.newly_added_sst_infos(None) + .map(|sst| sst.object_id) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { - let ssts_from_group_deltas = self.group_deltas.values().flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter() - }) - }); - - let ssts_from_change_log = self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log.new_value.iter().chain(new_log.old_value.iter()) - }); - - ssts_from_group_deltas - .chain(ssts_from_change_log) + self.newly_added_sst_infos(None) .map(|sst| sst.sst_id) .collect() } pub fn newly_added_sst_infos<'a>( &'a self, - select_group: &'a HashSet, + select_group: Option<&'a HashSet>, ) -> impl Iterator + 'a { self.group_deltas .iter() - .filter_map(|(cg_id, group_deltas)| { - if select_group.contains(cg_id) { - Some(group_deltas) - } else { + .filter_map(move |(cg_id, group_deltas)| { + if let Some(select_group) = select_group + && !select_group.contains(cg_id) + { None + } else { + Some(group_deltas) } }) .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC + let sst_slice = match &group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) + | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon { + inserted_table_infos, + .. + }) => Some(inserted_table_infos.iter()), + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, }; - sst_slice.iter() + sst_slice.into_iter().flatten() }) }) .chain(self.change_log_delta.values().flat_map(|delta| { @@ -785,7 +754,7 @@ where pub struct IntraLevelDeltaCommon { pub level_idx: u32, pub l0_sub_level_id: u64, - pub removed_table_ids: Vec, + pub removed_table_ids: HashSet, pub inserted_table_infos: Vec, pub vnode_partition_count: u32, } @@ -814,7 +783,7 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids, + removed_table_ids: HashSet::from_iter(pb_intra_level_delta.removed_table_ids), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .into_iter() @@ -833,7 +802,7 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids, + removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .into_iter() @@ -852,7 +821,11 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids.clone(), + removed_table_ids: intra_level_delta + .removed_table_ids + .iter() + .cloned() + .collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .iter() @@ -871,7 +844,9 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids.clone(), + removed_table_ids: HashSet::from_iter( + pb_intra_level_delta.removed_table_ids.iter().cloned(), + ), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .iter() @@ -886,7 +861,7 @@ impl IntraLevelDelta { pub fn new( level_idx: u32, l0_sub_level_id: u64, - removed_table_ids: Vec, + removed_table_ids: HashSet, inserted_table_infos: Vec, vnode_partition_count: u32, ) -> Self { @@ -902,6 +877,7 @@ impl IntraLevelDelta { #[derive(Debug, PartialEq, Clone)] pub enum GroupDeltaCommon { + NewL0SubLevel(Vec), IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), @@ -928,6 +904,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .into_iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } @@ -951,6 +934,14 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level + .into_iter() + .map(PbSstableInfo::from) + .collect(), + })), + }, } } } @@ -973,6 +964,11 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(), + })), + }, } } } @@ -995,6 +991,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(*pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7c70721f04d82..4e6ab26a539c6 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2585,9 +2585,12 @@ async fn test_commit_multi_epoch() { let initial_epoch = INVALID_EPOCH; let commit_epoch = - |epoch, sst: SstableInfo, new_table_fragment_info, tables_to_commit: &[TableId]| { + |epoch, sst: SstableInfo, new_table_fragment_infos, tables_to_commit: &[TableId]| { let manager = &test_env.manager; - let tables_to_commit = tables_to_commit.iter().cloned().collect(); + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); async move { manager .commit_epoch(CommitEpochInfo { @@ -2610,9 +2613,8 @@ async fn test_commit_multi_epoch() { sst_info: sst, created_at: u64::MAX, }], - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: Default::default(), - committed_epoch: epoch, tables_to_commit, }) .await @@ -2633,10 +2635,10 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst1_epoch1.clone(), - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: vec![existing_table_id], - }, + }], &[existing_table_id], ) .await; @@ -2678,13 +2680,7 @@ async fn test_commit_multi_epoch() { let epoch2 = epoch1.next_epoch(); - commit_epoch( - epoch2, - sst1_epoch2.clone(), - NewTableFragmentInfo::None, - &[existing_table_id], - ) - .await; + commit_epoch(epoch2, sst1_epoch2.clone(), vec![], &[existing_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2727,9 +2723,9 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst2_epoch1.clone(), - NewTableFragmentInfo::NewCompactionGroup { + vec![NewTableFragmentInfo::NewCompactionGroup { table_ids: HashSet::from_iter([new_table_id]), - }, + }], &[new_table_id], ) .await; @@ -2764,13 +2760,7 @@ async fn test_commit_multi_epoch() { ..Default::default() }; - commit_epoch( - epoch2, - sst2_epoch2.clone(), - NewTableFragmentInfo::None, - &[new_table_id], - ) - .await; + commit_epoch(epoch2, sst2_epoch2.clone(), vec![], &[new_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2804,7 +2794,7 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch3, sst_epoch3.clone(), - NewTableFragmentInfo::None, + vec![], &[existing_table_id, new_table_id], ) .await; diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d3e552a76213f..6734235225654 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; @@ -29,7 +29,7 @@ use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH, MAX_EP use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo, SyncResult}; use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use risingwave_meta::hummock::CommitEpochInfo; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::iterator::change_log::test_utils::{ apply_test_log_data, gen_test_data, @@ -1384,10 +1384,9 @@ async fn test_replicated_local_hummock_storage() { sstables: vec![], new_table_watermarks: Default::default(), sst_to_context: Default::default(), - new_table_fragment_info: NewTableFragmentInfo::None, + new_table_fragment_infos: vec![], change_log_delta: Default::default(), - committed_epoch: epoch0, - tables_to_commit: HashSet::from_iter([TEST_TABLE_ID]), + tables_to_commit: HashMap::from_iter([(TEST_TABLE_ID, epoch0)]), }) .await .unwrap(); diff --git a/src/storage/src/hummock/validator.rs b/src/storage/src/hummock/validator.rs index cc95b7089b664..2c0efbb3ca934 100644 --- a/src/storage/src/hummock/validator.rs +++ b/src/storage/src/hummock/validator.rs @@ -38,12 +38,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) .sst_id_to_worker_id .get(&sst.object_id) .expect("valid worker_id"); - tracing::debug!( - "Validating SST {} from worker {}, epoch {}", - sst.object_id, - worker_id, - task.epoch - ); + tracing::debug!("Validating SST {} from worker {}", sst.object_id, worker_id,); let holder = match sstable_store.sstable(&sst, unused.borrow_mut()).await { Ok(holder) => holder, Err(_err) => { @@ -100,12 +95,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) break; } } - tracing::debug!( - "Validated {} keys for SST {}, epoch {}", - key_counts, - sst.object_id, - task.epoch - ); + tracing::debug!("Validated {} keys for SST {}", key_counts, sst.object_id,); iter.collect_local_statistic(&mut unused); unused.ignore(); } diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 89801a3cf4133..eb1325141fdfd 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; use std::collections::VecDeque; use std::future::{pending, Future}; -use std::mem::replace; +use std::mem::{replace, take}; use std::sync::Arc; use anyhow::anyhow; use futures::future::Either; use futures::{pin_mut, Stream, TryStreamExt}; -use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -32,7 +31,6 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; -use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::backfill::utils::{create_builder, mapping_chunk}; @@ -40,7 +38,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation, + DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgressReporter; @@ -101,35 +99,14 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let upstream_table_id = self.upstream_table.table_id(); let first_barrier = expect_first_barrier(&mut self.upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; - let mut barrier_epoch = { + let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { - let subscriber_ids = first_recv_barrier - .added_subscriber_on_mv_table(upstream_table_id) - .collect_vec(); - let snapshot_backfill_table_fragment_id = match subscriber_ids.as_slice() { - [] => { - return Err(anyhow!( - "first recv barrier on backfill should add subscriber on upstream" - ) - .into()); - } - [snapshot_backfill_table_fragment_id] => *snapshot_backfill_table_fragment_id, - multiple => { - return Err(anyhow!( - "first recv barrier on backfill have multiple subscribers {:?} on upstream table {}", - multiple, upstream_table_id.table_id - ) - .into()); - } - }; - let table_id_str = format!("{}", self.upstream_table.table_id().table_id); let actor_id_str = format!("{}", self.actor_ctx.id); @@ -138,12 +115,8 @@ impl SnapshotBackfillExecutor { .snapshot_backfill_consume_row_count .with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]); - let mut upstream_buffer = UpstreamBuffer::new( - &mut self.upstream, - upstream_table_id, - snapshot_backfill_table_fragment_id, - consume_upstream_row_count, - ); + let mut upstream_buffer = + UpstreamBuffer::new(&mut self.upstream, consume_upstream_row_count); let first_barrier_epoch = first_barrier.epoch; @@ -165,7 +138,7 @@ impl SnapshotBackfillExecutor { self.rate_limit, &mut self.barrier_rx, &self.output_indices, - self.progress, + &mut self.progress, first_recv_barrier, ); @@ -187,12 +160,11 @@ impl SnapshotBackfillExecutor { yield Message::Barrier(recv_barrier); } - let mut upstream_buffer = - upstream_buffer.start_consuming_log_store(&mut self.barrier_rx); + let mut upstream_buffer = upstream_buffer.start_consuming_log_store(); let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.state.barrier_count(); + let initial_pending_barrier = upstream_buffer.barrier_count(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -210,37 +182,50 @@ impl SnapshotBackfillExecutor { ]); // Phase 2: consume upstream log store - while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? { - assert_eq!(barrier_epoch.curr, barrier.epoch.prev); - barrier_epoch = barrier.epoch; - - debug!(?barrier_epoch, kind = ?barrier.kind, "before consume change log"); - // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure - // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, - // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. - let stream = upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( - barrier_epoch.prev, - HummockReadEpoch::Committed(barrier_epoch.prev), - )) - .await?; - let data_types = self.upstream_table.schema().data_types(); - let builder = create_builder(None, self.chunk_size, data_types); - let stream = read_change_log(stream, builder); - pin_mut!(stream); - while let Some(chunk) = upstream_buffer.run_future(stream.try_next()).await? { - debug!( - ?barrier_epoch, - size = chunk.cardinality(), - "consume change log yield chunk", - ); - consuming_log_store_row_count.inc_by(chunk.cardinality() as _); - yield Message::Chunk(chunk); - } + while let Some(upstream_barriers) = + upstream_buffer.next_checkpoint_barrier().await? + { + for upstream_barrier in upstream_barriers { + let barrier = receive_next_barrier(&mut self.barrier_rx).await?; + assert_eq!(upstream_barrier.epoch, barrier.epoch); + assert_eq!(barrier_epoch.curr, barrier.epoch.prev); + barrier_epoch = barrier.epoch; + + debug!(?barrier_epoch, kind = ?barrier.kind, "before consume change log"); + // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure + // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, + // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. + let stream = upstream_buffer + .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + barrier_epoch.prev, + HummockReadEpoch::Committed(barrier_epoch.prev), + )) + .await?; + let data_types = self.upstream_table.schema().data_types(); + let builder = create_builder(None, self.chunk_size, data_types); + let stream = read_change_log(stream, builder); + pin_mut!(stream); + while let Some(chunk) = + upstream_buffer.run_future(stream.try_next()).await? + { + debug!( + ?barrier_epoch, + size = chunk.cardinality(), + "consume change log yield chunk", + ); + consuming_log_store_row_count.inc_by(chunk.cardinality() as _); + yield Message::Chunk(chunk); + } + + debug!(?barrier_epoch, "after consume change log"); - debug!(?barrier_epoch, "after consume change log"); + self.progress.update_create_mview_log_store_progress( + barrier.epoch, + upstream_buffer.barrier_count(), + ); - yield Message::Barrier(barrier); + yield Message::Barrier(barrier); + } } info!( @@ -248,7 +233,7 @@ impl SnapshotBackfillExecutor { table_id = self.upstream_table.table_id().table_id, "finish consuming log store" ); - barrier_epoch + (barrier_epoch, true) } else { info!( table_id = self.upstream_table.table_id().table_id, @@ -257,7 +242,7 @@ impl SnapshotBackfillExecutor { let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); yield Message::Barrier(first_recv_barrier); - first_barrier.epoch + (first_barrier.epoch, false) } }; let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); @@ -266,6 +251,10 @@ impl SnapshotBackfillExecutor { if let Message::Barrier(barrier) = &msg { assert_eq!(barrier.epoch.prev, barrier_epoch.curr); barrier_epoch = barrier.epoch; + if need_report_finish { + need_report_finish = false; + self.progress.finish_consuming_log_store(barrier_epoch); + } } yield msg; } @@ -328,146 +317,84 @@ async fn read_change_log( } } -trait UpstreamBufferState { - // The future must be cancellation-safe - async fn is_finished(&mut self) -> StreamExecutorResult; - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier); -} - -struct StateOfConsumingSnapshot { - pending_barriers: Vec, -} - -impl UpstreamBufferState for StateOfConsumingSnapshot { - async fn is_finished(&mut self) -> StreamExecutorResult { - // never finish when consuming snapshot - Ok(false) - } - - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { - self.pending_barriers.push(upstream_barrier) - } -} - -struct StateOfConsumingLogStore<'a> { - barrier_rx: &'a mut mpsc::UnboundedReceiver, - /// Barriers received from upstream but not yet received the barrier from local barrier worker - /// newer barrier at the front - upstream_pending_barriers: VecDeque, - /// Barriers received from both upstream and local barrier worker - /// newer barrier at the front - barriers: VecDeque, - is_finished: bool, - current_subscriber_id: u32, - upstream_table_id: TableId, -} - -impl<'a> StateOfConsumingLogStore<'a> { - fn barrier_count(&self) -> usize { - self.upstream_pending_barriers.len() + self.barriers.len() - } - - async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult { - assert!(!self.is_finished); - let barrier = receive_next_barrier(self.barrier_rx).await?; - assert_eq!( - self.upstream_pending_barriers - .pop_back() - .expect("non-empty") - .epoch, - barrier.epoch - ); - if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { - self.is_finished = true; - } - Ok(barrier) - } -} - -impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { - async fn is_finished(&mut self) -> StreamExecutorResult { - while !self.upstream_pending_barriers.is_empty() { - let barrier = self.handle_one_pending_barrier().await?; - self.barriers.push_front(barrier); - } - if self.is_finished { - assert!(self.upstream_pending_barriers.is_empty()); - } - Ok(self.is_finished) - } - - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { - self.upstream_pending_barriers.push_front(upstream_barrier); - } -} +struct ConsumingSnapshot; +struct ConsumingLogStore; struct UpstreamBuffer<'a, S> { upstream: &'a mut MergeExecutorInput, - state: S, + max_pending_checkpoint_barrier_num: usize, + pending_non_checkpoint_barriers: Vec, + /// Barriers received from upstream but not yet received the barrier from local barrier worker. + /// + /// In the outer `VecDeque`, newer barriers at the front. + /// In the inner `Vec`, newer barrier at the back, with the last barrier as checkpoint barrier, + /// and others as non-checkpoint barrier + upstream_pending_barriers: VecDeque>, + /// Whether we have started polling any upstream data before the next barrier. + /// When `true`, we should continue polling until the next barrier, because + /// some data in this epoch have been discarded and data in this epoch + /// must be read from log store + is_polling_epoch_data: bool, consume_upstream_row_count: LabelGuardedIntCounter<3>, - upstream_table_id: TableId, - current_subscriber_id: u32, + _phase: S, } -impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { +impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> { fn new( upstream: &'a mut MergeExecutorInput, - upstream_table_id: TableId, - current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, ) -> Self { Self { upstream, - state: StateOfConsumingSnapshot { - pending_barriers: vec![], - }, + is_polling_epoch_data: false, consume_upstream_row_count, - upstream_table_id, - current_subscriber_id, + pending_non_checkpoint_barriers: vec![], + upstream_pending_barriers: Default::default(), + // no limit on the number of pending barrier in the beginning + max_pending_checkpoint_barrier_num: usize::MAX, + _phase: ConsumingSnapshot {}, } } - fn start_consuming_log_store<'s>( - self, - barrier_rx: &'s mut UnboundedReceiver, - ) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { - let StateOfConsumingSnapshot { pending_barriers } = self.state; - let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len()); - for pending_barrier in pending_barriers { - upstream_pending_barriers.push_front(pending_barrier); - } + fn start_consuming_log_store(self) -> UpstreamBuffer<'a, ConsumingLogStore> { + let max_pending_barrier_num = self.barrier_count(); UpstreamBuffer { upstream: self.upstream, - state: StateOfConsumingLogStore { - barrier_rx, - upstream_pending_barriers, - barriers: Default::default(), - is_finished: false, - current_subscriber_id: self.current_subscriber_id, - upstream_table_id: self.upstream_table_id, - }, + pending_non_checkpoint_barriers: self.pending_non_checkpoint_barriers, + upstream_pending_barriers: self.upstream_pending_barriers, + max_pending_checkpoint_barrier_num: max_pending_barrier_num, + is_polling_epoch_data: self.is_polling_epoch_data, consume_upstream_row_count: self.consume_upstream_row_count, - upstream_table_id: self.upstream_table_id, - current_subscriber_id: self.current_subscriber_id, + _phase: ConsumingLogStore {}, } } } -impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { +impl<'a, S> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - if let Err(e) = try { - while !self.state.is_finished().await? { - self.consume_until_next_barrier().await?; + { + loop { + if let Err(e) = try { + if self.upstream_pending_barriers.len() + >= self.max_pending_checkpoint_barrier_num + { + // pause the future to block consuming upstream + return pending().await; + } + let barrier = self.consume_until_next_checkpoint_barrier().await?; + self.upstream_pending_barriers.push_front(barrier); + } { + break e; + } } - } { - return e; } - pending().await } /// Consume the upstream until seeing the next barrier. /// `pending_barriers` must be non-empty after this method returns. - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> { + async fn consume_until_next_checkpoint_barrier( + &mut self, + ) -> StreamExecutorResult> { loop { let msg: DispatcherMessage = self .upstream @@ -476,63 +403,54 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { .ok_or_else(|| anyhow!("end of upstream"))?; match msg { DispatcherMessage::Chunk(chunk) => { + self.is_polling_epoch_data = true; self.consume_upstream_row_count .inc_by(chunk.cardinality() as _); } DispatcherMessage::Barrier(barrier) => { - self.state.on_upstream_barrier(barrier); - break Ok(()); + let is_checkpoint = barrier.kind.is_checkpoint(); + self.pending_non_checkpoint_barriers.push(barrier); + if is_checkpoint { + self.is_polling_epoch_data = false; + break Ok(take(&mut self.pending_non_checkpoint_barriers)); + } else { + self.is_polling_epoch_data = true; + } + } + DispatcherMessage::Watermark(_) => { + self.is_polling_epoch_data = true; } - DispatcherMessage::Watermark(_) => {} } } } } -impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { - async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { - Ok(if let Some(barrier) = self.state.barriers.pop_back() { - Some(barrier) - } else if !self.state.upstream_pending_barriers.is_empty() { - let barrier = self.state.handle_one_pending_barrier().await?; - Some(barrier) - } else if self.state.is_finished { - None - } else { - self.consume_until_next_barrier().await?; - let barrier = self.state.handle_one_pending_barrier().await?; - Some(barrier) - }) - } -} - -fn is_finish_barrier( - barrier: &Barrier, - current_subscriber_id: u32, - upstream_table_id: TableId, -) -> bool { - if let Some(Mutation::DropSubscriptions { - subscriptions_to_drop, - }) = barrier.mutation.as_deref() - { - let is_finished = subscriptions_to_drop - .iter() - .any(|(subscriber_id, _)| *subscriber_id == current_subscriber_id); - if is_finished { - assert!(subscriptions_to_drop.iter().any( - |(subscriber_id, subscribed_upstream_table_id)| { - *subscriber_id == current_subscriber_id - && upstream_table_id == *subscribed_upstream_table_id +impl<'a> UpstreamBuffer<'a, ConsumingLogStore> { + async fn next_checkpoint_barrier( + &mut self, + ) -> StreamExecutorResult>> { + Ok( + if let Some(barriers) = self.upstream_pending_barriers.pop_back() { + // sub(1) to ensure that the lag is monotonically decreasing. + self.max_pending_checkpoint_barrier_num = min( + self.upstream_pending_barriers.len(), + self.max_pending_checkpoint_barrier_num.saturating_sub(1), + ); + Some(barriers) + } else { + self.max_pending_checkpoint_barrier_num = 0; + if self.is_polling_epoch_data { + let barriers = self.consume_until_next_checkpoint_barrier().await?; + Some(barriers) + } else { + None } - )) - } - is_finished - } else { - false + }, + ) } } -impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { +impl<'a, S> UpstreamBuffer<'a, S> { /// Run a future while concurrently polling the upstream so that the upstream /// won't be back-pressured. async fn run_future>( @@ -550,6 +468,10 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { } } } + + fn barrier_count(&self) -> usize { + self.upstream_pending_barriers.len() + } } async fn receive_next_barrier( @@ -589,7 +511,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, output_indices: &'a [usize], - mut progress: CreateMviewProgressReporter, + progress: &'a mut CreateMviewProgressReporter, first_recv_barrier: Barrier, ) { let mut barrier_epoch = first_recv_barrier.epoch; diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 9b2820bb3bfed..a91f9b8476111 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::fmt::{Display, Formatter}; use risingwave_common::util::epoch::EpochPair; @@ -27,23 +28,30 @@ type ConsumedRows = u64; #[derive(Debug, Clone, Copy)] pub(crate) enum BackfillState { - ConsumingUpstream(ConsumedEpoch, ConsumedRows), - Done(ConsumedRows), + ConsumingUpstreamTable(ConsumedEpoch, ConsumedRows), + DoneConsumingUpstreamTable(ConsumedRows), + ConsumingLogStore { pending_barrier_num: usize }, + DoneConsumingLogStore, } impl BackfillState { pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress { + let (done, consumed_epoch, consumed_rows, pending_barrier_num) = match self { + BackfillState::ConsumingUpstreamTable(consumed_epoch, consumed_rows) => { + (false, consumed_epoch, consumed_rows, 0) + } + BackfillState::DoneConsumingUpstreamTable(consumed_rows) => (true, 0, consumed_rows, 0), /* unused field for done */ + BackfillState::ConsumingLogStore { + pending_barrier_num, + } => (false, 0, 0, pending_barrier_num as _), + BackfillState::DoneConsumingLogStore => (true, 0, 0, 0), + }; PbCreateMviewProgress { backfill_actor_id: actor_id, - done: matches!(self, BackfillState::Done(_)), - consumed_epoch: match self { - BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, - BackfillState::Done(_) => 0, // unused field for done - }, - consumed_rows: match self { - BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows, - BackfillState::Done(consumed_rows) => consumed_rows, - }, + done, + consumed_epoch, + consumed_rows, + pending_barrier_num, } } } @@ -51,10 +59,27 @@ impl BackfillState { impl Display for BackfillState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - BackfillState::ConsumingUpstream(epoch, rows) => { - write!(f, "ConsumingUpstream(epoch: {}, rows: {})", epoch, rows) + BackfillState::ConsumingUpstreamTable(epoch, rows) => { + write!( + f, + "ConsumingUpstreamTable(epoch: {}, rows: {})", + epoch, rows + ) + } + BackfillState::DoneConsumingUpstreamTable(rows) => { + write!(f, "DoneConsumingUpstreamTable(rows: {})", rows) + } + BackfillState::ConsumingLogStore { + pending_barrier_num, + } => { + write!( + f, + "ConsumingLogStore(pending_barrier_num: {pending_barrier_num})" + ) + } + BackfillState::DoneConsumingLogStore => { + write!(f, "DoneConsumingLogStore") } - BackfillState::Done(rows) => write!(f, "Done(rows: {})", rows), } } } @@ -165,7 +190,7 @@ impl CreateMviewProgressReporter { current_consumed_rows: ConsumedRows, ) { match self.state { - Some(BackfillState::ConsumingUpstream(last, last_consumed_rows)) => { + Some(BackfillState::ConsumingUpstreamTable(last, last_consumed_rows)) => { assert!( last < consumed_epoch, "last_epoch: {:#?} must be greater than consumed epoch: {:#?}", @@ -174,22 +199,61 @@ impl CreateMviewProgressReporter { ); assert!(last_consumed_rows <= current_consumed_rows); } - Some(BackfillState::Done(_)) => unreachable!(), + Some(state) => { + panic!( + "should not update consuming progress at invalid state: {:?}", + state + ) + } None => {} }; self.update_inner( epoch, - BackfillState::ConsumingUpstream(consumed_epoch, current_consumed_rows), + BackfillState::ConsumingUpstreamTable(consumed_epoch, current_consumed_rows), ); } /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) { - if let Some(BackfillState::Done(_)) = self.state { + if let Some(BackfillState::DoneConsumingUpstreamTable(_)) = self.state { return; } - self.update_inner(epoch, BackfillState::Done(current_consumed_rows)); + self.update_inner( + epoch, + BackfillState::DoneConsumingUpstreamTable(current_consumed_rows), + ); + } + + pub(crate) fn update_create_mview_log_store_progress( + &mut self, + epoch: EpochPair, + pending_barrier_num: usize, + ) { + assert_matches!( + self.state, + Some(BackfillState::DoneConsumingUpstreamTable(_)) + | Some(BackfillState::ConsumingLogStore { .. }), + "cannot update log store progress at state {:?}", + self.state + ); + self.update_inner( + epoch, + BackfillState::ConsumingLogStore { + pending_barrier_num, + }, + ); + } + + pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) { + assert_matches!( + self.state, + Some(BackfillState::DoneConsumingUpstreamTable(_)) + | Some(BackfillState::ConsumingLogStore { .. }), + "cannot finish log store progress at state {:?}", + self.state + ); + self.update_inner(epoch, BackfillState::DoneConsumingLogStore); } } @@ -201,7 +265,7 @@ impl LocalBarrierManager { /// /// When all backfill executors of the creating mview finish, the creation progress will be done at /// frontend and the mview will be exposed to the user. - pub fn register_create_mview_progress( + pub(crate) fn register_create_mview_progress( &self, backfill_actor_id: ActorId, ) -> CreateMviewProgressReporter {