diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..c59a181197f9e 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -528,7 +528,6 @@ message ReportCompactionTaskResponse { message ValidationTask { repeated SstableInfo sst_infos = 1; map sst_id_to_worker_id = 2; - uint64 epoch = 3; } // Delete SSTs in object store diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 120c6d8518bbb..dfe51da9a64ac 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::pending; +use std::future::{pending, Future}; use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; @@ -55,6 +54,7 @@ use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; +use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -218,7 +218,7 @@ struct CheckpointControl { /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. - completing_command: CompletingCommand, + completing_task: CompletingTask, hummock_version_stats: HummockVersionStats, @@ -235,7 +235,7 @@ impl CheckpointControl { Self { command_ctx_queue: Default::default(), creating_streaming_job_controls: Default::default(), - completing_command: CompletingCommand::None, + completing_task: CompletingTask::None, hummock_version_stats: context.hummock_manager.get_version_stats().await, create_mview_tracker, context, @@ -244,8 +244,11 @@ impl CheckpointControl { fn total_command_num(&self) -> usize { self.command_ctx_queue.len() - + match &self.completing_command { - CompletingCommand::GlobalStreamingGraph { .. } => 1, + + match &self.completing_task { + CompletingTask::Completing { + command_ctx: Some(_), + .. + } => 1, _ => 0, } } @@ -292,7 +295,6 @@ impl CheckpointControl { notifiers: Vec, node_to_collect: HashSet, jobs_to_wait: HashSet, - table_ids_to_commit: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -334,8 +336,7 @@ impl CheckpointControl { node_to_collect, resps: vec![], creating_jobs_to_wait, - finished_table_ids: HashMap::new(), - table_ids_to_commit, + finished_jobs: HashMap::new(), }, command_ctx, notifiers, @@ -400,11 +401,9 @@ impl CheckpointControl { .command_ctx_queue .last_key_value() .map(|(_, x)| &x.command_ctx) - .or(match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => Some(command_ctx), + .or(match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), }) .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); @@ -413,12 +412,9 @@ impl CheckpointControl { .values() .map(|node| &node.command_ctx) .chain( - match &self.completing_command { - CompletingCommand::None - | CompletingCommand::Err(_) - | CompletingCommand::CreatingStreamingJob { .. } => None, - CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => - Some(command_ctx), + match &self.completing_task { + CompletingTask::None | CompletingTask::Err(_) => None, + CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), } .into_iter() ) @@ -432,32 +428,10 @@ impl CheckpointControl { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. - let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { - CompletingCommand::None => false, - CompletingCommand::GlobalStreamingGraph { - command_ctx, - join_handle, - .. - } => { - info!( - prev_epoch = ?command_ctx.prev_epoch, - curr_epoch = ?command_ctx.curr_epoch, - "waiting for completing command to finish in recovery" - ); - match join_handle.await { - Err(e) => { - warn!(err = ?e.as_report(), "failed to join completing task"); - true - } - Ok(Err(e)) => { - warn!(err = ?e.as_report(), "failed to complete barrier during clear"); - true - } - Ok(Ok(_)) => false, - } - } - CompletingCommand::Err(_) => true, - CompletingCommand::CreatingStreamingJob { join_handle, .. } => { + let is_err = match replace(&mut self.completing_task, CompletingTask::None) { + CompletingTask::None => false, + CompletingTask::Completing { join_handle, .. } => { + info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); @@ -470,38 +444,19 @@ impl CheckpointControl { Ok(Ok(_)) => false, } } + CompletingTask::Err(_) => true, }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() - { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); - let (prev_epoch, curr_epoch) = ( - node.command_ctx.prev_epoch.value().0, - node.command_ctx.curr_epoch.value().0, - ); - let finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, &self.hummock_version_stats); - if let Err(e) = self - .context - .clone() - .complete_barrier(node, finished_jobs, HashMap::new()) - .await - { + while let Some(task) = self.next_complete_barrier_task(None) { + if let Err(e) = self.context.clone().complete_barrier(task).await { error!( - prev_epoch, - curr_epoch, err = ?e.as_report(), "failed to complete barrier during recovery" ); break; } else { - info!( - prev_epoch, - curr_epoch, "succeed to complete barrier during recovery" - ) + info!("succeed to complete barrier during recovery") } } } @@ -547,9 +502,7 @@ struct BarrierEpochState { creating_jobs_to_wait: HashMap>, - finished_table_ids: HashMap, - - table_ids_to_commit: HashSet, + finished_jobs: HashMap, } impl BarrierEpochState { @@ -558,22 +511,17 @@ impl BarrierEpochState { } } -enum CompletingCommand { +enum CompletingTask { None, - GlobalStreamingGraph { - command_ctx: Arc, + Completing { + command_ctx: Option>, table_ids_to_finish: HashSet, - require_next_checkpoint: bool, + creating_job_epochs: Vec<(TableId, u64)>, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>>, - }, - CreatingStreamingJob { - table_id: TableId, - epoch: u64, - join_handle: JoinHandle>, + join_handle: JoinHandle>, }, #[expect(dead_code)] Err(MetaError), @@ -863,22 +811,15 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier() => { + complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { match complete_result { - Ok(Some(output)) => { - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if output.require_next_checkpoint { - assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } + Ok(output) => { if !output.table_ids_to_finish.is_empty() { self.control_stream_manager.remove_partial_graph( output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() ); } } - Ok(None) => {} Err(e) => { self.failure_recovery(e).await; } @@ -1033,7 +974,7 @@ impl GlobalBarrierManager { pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), - table_ids_to_commit.clone(), + table_ids_to_commit, self.state.paused_reason(), command, kind, @@ -1085,7 +1026,6 @@ impl GlobalBarrierManager { notifiers, node_to_collect, jobs_to_wait, - table_ids_to_commit, ); Ok(()) @@ -1150,140 +1090,86 @@ impl GlobalBarrierManager { } } +#[derive(Default)] +struct CompleteBarrierTask { + commit_info: CommitEpochInfo, + finished_jobs: Vec, + notifiers: Vec, + /// Some((`command_ctx`, `enqueue_time`)) + command_context: Option<(Arc, HistogramTimer)>, + table_ids_to_finish: HashSet, + creating_job_epochs: Vec<(TableId, u64)>, +} + impl GlobalBarrierManagerContext { - async fn complete_creating_job_barrier( - self, + fn collect_creating_job_commit_epoch_info( + commit_info: &mut CommitEpochInfo, epoch: u64, resps: Vec, - tables_to_commit: HashSet, + tables_to_commit: impl Iterator, is_first_time: bool, - ) -> MetaResult<()> { + ) { let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); assert!(old_value_sst.is_empty()); - let new_table_fragment_info = if is_first_time { - NewTableFragmentInfo::NewCompactionGroup { - table_ids: tables_to_commit.clone(), - } - } else { - NewTableFragmentInfo::None - }; - let info = CommitEpochInfo { - sstables, - new_table_watermarks, - sst_to_context, - new_table_fragment_info, - change_log_delta: Default::default(), - committed_epoch: epoch, - tables_to_commit, + commit_info.sst_to_context.extend(sst_to_context); + commit_info.sstables.extend(sstables); + commit_info + .new_table_watermarks + .extend(new_table_watermarks); + let tables_to_commit: HashSet<_> = tables_to_commit.collect(); + tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + }); + if is_first_time { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo::NewCompactionGroup { + table_ids: tables_to_commit, + }); }; - self.hummock_manager.commit_epoch(info).await?; - Ok(()) } - async fn complete_barrier( - self, - node: EpochNode, - mut finished_jobs: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { - tracing::trace!( - prev_epoch = node.command_ctx.prev_epoch.value().0, - kind = ?node.command_ctx.kind, - "complete barrier" - ); - let EpochNode { - command_ctx, - notifiers, - enqueue_time, - state, - .. - } = node; - assert!(state.node_to_collect.is_empty()); - assert!(state.creating_jobs_to_wait.is_empty()); - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - if !state.finished_table_ids.is_empty() { - assert!(command_ctx.kind.is_checkpoint()); - } - finished_jobs.extend(state.finished_table_ids.into_values().map(|info| { - TrackingJob::New(TrackingCommand { - info, - replace_table_info: None, - }) - })); - - let result = self - .update_snapshot( - &command_ctx, - state.table_ids_to_commit, - state.resps, - backfill_pinned_log_epoch, - ) - .await; - - let version_stats = match result { - Ok(version_stats) => version_stats, - Err(e) => { - for notifier in notifiers { - notifier.notify_collection_failed(e.clone()); - } - return Err(e); + async fn complete_barrier(self, task: CompleteBarrierTask) -> MetaResult { + let result: MetaResult<()> = try { + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + self.hummock_manager.commit_epoch(task.commit_info).await?; + if let Some((command_ctx, _)) = &task.command_context { + command_ctx.post_collect().await?; } + + wait_commit_timer.observe_duration(); }; - notifiers.into_iter().for_each(|notifier| { - notifier.notify_collected(); - }); - try_join_all(finished_jobs.into_iter().map(|finished_job| { - let metadata_manager = &self.metadata_manager; - async move { finished_job.finish(metadata_manager).await } - })) - .await?; - let duration_sec = enqueue_time.stop_and_record(); - self.report_complete_event(duration_sec, &command_ctx); - wait_commit_timer.observe_duration(); - self.metrics - .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); - Ok(version_stats) - } - async fn update_snapshot( - &self, - command_ctx: &CommandContext, - tables_to_commit: HashSet, - resps: Vec, - backfill_pinned_log_epoch: HashMap)>, - ) -> MetaResult> { { - { - match &command_ctx.kind { - BarrierKind::Initial => {} - BarrierKind::Checkpoint(epochs) => { - let commit_info = collect_commit_epoch_info( - resps, - command_ctx, - epochs, - backfill_pinned_log_epoch, - tables_to_commit, - ); - self.hummock_manager.commit_epoch(commit_info).await?; - } - BarrierKind::Barrier => { - // if we collect a barrier(checkpoint = false), - // we need to ensure that command is Plain and the notifier's checkpoint is - // false - assert!(!command_ctx.command.need_checkpoint()); - } + if let Err(e) = result { + for notifier in task.notifiers { + notifier.notify_collection_failed(e.clone()); } - - command_ctx.post_collect().await?; - Ok(if command_ctx.kind.is_checkpoint() { - Some(self.hummock_manager.get_version_stats().await) - } else { - None - }) + return Err(e); + } + task.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + try_join_all( + task.finished_jobs + .into_iter() + .map(|finished_job| finished_job.finish(&self.metadata_manager)), + ) + .await?; + if let Some((command_ctx, enqueue_time)) = task.command_context { + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + self.metrics + .last_committed_barrier_time + .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); } } + + Ok(self.hummock_manager.get_version_stats().await) } pub fn hummock_manager(&self) -> &HummockManagerRef { @@ -1350,8 +1236,6 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - command_ctx: Arc, - require_next_checkpoint: bool, table_ids_to_finish: HashSet, } @@ -1381,42 +1265,67 @@ impl CheckpointControl { .collect() } - pub(super) async fn next_completed_barrier( + fn next_complete_barrier_task( &mut self, - ) -> MetaResult> { - if matches!(&self.completing_command, CompletingCommand::None) { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() - && !state.is_inflight() + mut scheduled_barriers: Option<&mut ScheduledBarriers>, + ) -> Option { + let mut task = None; + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { { - let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); - let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect(); - let finished_jobs = self + assert!(node.state.node_to_collect.is_empty()); + let mut finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - let command_ctx = node.command_ctx.clone(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - node, - finished_jobs, - self.collect_backfill_pinned_upstream_log_epoch(), - )); - let require_next_checkpoint = - if self.create_mview_tracker.has_pending_finished_jobs() { - self.command_ctx_queue + if !node.command_ctx.kind.is_checkpoint() { + assert!(finished_jobs.is_empty()); + node.notifiers.into_iter().for_each(|notifier| { + notifier.notify_collected(); + }); + if let Some(scheduled_barriers) = &mut scheduled_barriers + && self.create_mview_tracker.has_pending_finished_jobs() + && self + .command_ctx_queue .values() .all(|node| !node.command_ctx.kind.is_checkpoint()) - } else { - false - }; - self.completing_command = CompletingCommand::GlobalStreamingGraph { - command_ctx, - require_next_checkpoint, - join_handle, + { + scheduled_barriers.force_checkpoint_in_next_barrier(); + } + continue; + } + let commit_info = collect_commit_epoch_info( + take(&mut node.state.resps), + &node.command_ctx, + self.collect_backfill_pinned_upstream_log_epoch(), + ); + let table_ids_to_finish = node + .state + .finished_jobs + .drain() + .map(|(table_id, info)| { + finished_jobs.push(TrackingJob::New(TrackingCommand { + info, + replace_table_info: None, + })); + table_id + }) + .collect(); + task = Some(CompleteBarrierTask { + commit_info, + finished_jobs, + notifiers: node.notifiers, + command_context: Some((node.command_ctx, node.enqueue_time)), table_ids_to_finish, - }; - } else { + creating_job_epochs: vec![], + }); + break; + } + } + { + { for (table_id, job) in &mut self.creating_streaming_job_controls { let (upstream_epochs_to_notify, commit_info) = job.start_completing(); for upstream_epoch in upstream_epochs_to_notify { @@ -1433,32 +1342,58 @@ impl CheckpointControl { } } if let Some((epoch, resps, is_first_time)) = commit_info { - let tables_to_commit = job - .info - .table_fragments - .all_table_ids() - .map(TableId::new) - .collect(); - let join_handle = - tokio::spawn(self.context.clone().complete_creating_job_barrier( - epoch, - resps, - tables_to_commit, - is_first_time, - )); - self.completing_command = CompletingCommand::CreatingStreamingJob { - table_id: *table_id, + let task = task.get_or_insert_default(); + GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + &mut task.commit_info, epoch, - join_handle, - }; - break; + resps, + job.info.table_fragments.all_table_ids().map(TableId::new), + is_first_time, + ); + task.creating_job_epochs.push((*table_id, epoch)); } } } } + task + } - match &mut self.completing_command { - CompletingCommand::GlobalStreamingGraph { join_handle, .. } => { + pub(super) fn next_completed_barrier<'a>( + &'a mut self, + scheduled_barriers: &mut ScheduledBarriers, + ) -> impl Future> + 'a { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let CompletingTask::None = &self.completing_task { + if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) { + { + let command_ctx = task + .command_context + .as_ref() + .map(|(command_ctx, _)| command_ctx.clone()); + let table_ids_to_finish = task.table_ids_to_finish.clone(); + let creating_job_epochs = task.creating_job_epochs.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); + self.completing_task = CompletingTask::Completing { + command_ctx, + join_handle, + table_ids_to_finish, + creating_job_epochs, + }; + } + } + } + + self.next_completed_barrier_inner() + } + + async fn next_completed_barrier_inner(&mut self) -> MetaResult { + let CompletingTask::Completing { join_handle, .. } = &mut self.completing_task else { + return pending().await; + }; + + let (table_ids_to_finish, creating_job_epochs) = { + { let join_result: MetaResult<_> = try { join_handle .await @@ -1467,48 +1402,24 @@ impl CheckpointControl { // It's important to reset the completing_command after await no matter the result is err // or not, and otherwise the join handle will be polled again after ready. let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) + CompletingTask::Err(e.clone()) } else { - CompletingCommand::None + CompletingTask::None }; let completed_command = - replace(&mut self.completing_command, next_completing_command_status); - join_result.map(move | version_stats| { - if let Some(new_version_stats) = version_stats { - self.hummock_version_stats = new_version_stats; - } - must_match!( - completed_command, - CompletingCommand::GlobalStreamingGraph { command_ctx, table_ids_to_finish, require_next_checkpoint, .. } => { - Some(BarrierCompleteOutput { - command_ctx, - require_next_checkpoint, - table_ids_to_finish, - }) - } - ) - }) + replace(&mut self.completing_task, next_completing_command_status); + self.hummock_version_stats = join_result?; + + must_match!(completed_command, CompletingTask::Completing { + table_ids_to_finish, + creating_job_epochs, + .. + } => (table_ids_to_finish, creating_job_epochs)) } - CompletingCommand::CreatingStreamingJob { - table_id, - epoch, - join_handle, - } => { - let table_id = *table_id; - let epoch = *epoch; - let join_result: MetaResult<_> = try { - join_handle - .await - .context("failed to join completing command")?? - }; - // It's important to reset the completing_command after await no matter the result is err - // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingCommand::Err(e.clone()) - } else { - CompletingCommand::None - }; - self.completing_command = next_completing_command_status; + }; + + { + for (table_id, epoch) in creating_job_epochs { if let Some((upstream_epoch, is_finished)) = self .creating_streaming_job_controls .get_mut(&table_id) @@ -1538,14 +1449,16 @@ impl CheckpointControl { .get_mut(&upstream_epoch) .expect("should exist") .state - .finished_table_ids + .finished_jobs .insert(table_id, creating_streaming_job.info) .is_none()); } } - join_result.map(|_| None) } - CompletingCommand::None | CompletingCommand::Err(_) => pending().await, + + Ok(BarrierCompleteOutput { + table_ids_to_finish, + }) } } } @@ -1682,28 +1595,26 @@ fn collect_resp_info( fn collect_commit_epoch_info( resps: Vec, command_ctx: &CommandContext, - epochs: &Vec, backfill_pinned_log_epoch: HashMap)>, - tables_to_commit: HashSet, ) -> CommitEpochInfo { let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = collect_resp_info(resps); - let new_table_fragment_info = if let Command::CreateStreamingJob { info, job_type } = + let new_table_fragment_infos = if let Command::CreateStreamingJob { info, job_type } = &command_ctx.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { let table_fragments = &info.table_fragments; - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: table_fragments.mv_table_id().map(TableId::new), internal_table_ids: table_fragments .internal_table_ids() .into_iter() .map(TableId::new) .collect(), - } + }] } else { - NewTableFragmentInfo::None + vec![] }; let mut mv_log_store_truncate_epoch = HashMap::new(); @@ -1739,19 +1650,23 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - epochs, + must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), ); let epoch = command_ctx.prev_epoch.value().0; + let tables_to_commit = command_ctx + .table_ids_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); CommitEpochInfo { sstables: synced_ssts, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_new_change_log, - committed_epoch: epoch, tables_to_commit, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index cb353d32e0890..da97a29c06f8c 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -28,6 +29,7 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::hummock::compact_task::{self}; +use risingwave_pb::hummock::CompactionConfig; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -47,7 +49,6 @@ use crate::hummock::{ }; pub enum NewTableFragmentInfo { - None, Normal { mv_table_id: Option, internal_table_ids: Vec, @@ -57,14 +58,15 @@ pub enum NewTableFragmentInfo { }, } +#[derive(Default)] pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, pub sst_to_context: HashMap, - pub new_table_fragment_info: NewTableFragmentInfo, + pub new_table_fragment_infos: Vec, pub change_log_delta: HashMap, - pub committed_epoch: u64, - pub tables_to_commit: HashSet, + /// `table_id` -> `committed_epoch` + pub tables_to_commit: HashMap, } impl HummockManager { @@ -75,9 +77,8 @@ impl HummockManager { mut sstables, new_table_watermarks, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta, - committed_epoch, tables_to_commit, } = commit_info; let mut versioning_guard = self.versioning.write().await; @@ -91,7 +92,6 @@ impl HummockManager { let versioning: &mut Versioning = &mut versioning_guard; self.commit_epoch_sanity_check( - committed_epoch, &tables_to_commit, &sstables, &sst_to_context, @@ -124,15 +124,18 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let mut new_table_ids = HashMap::new(); + let mut new_compaction_groups = HashMap::new(); + let mut compaction_group_manager_txn = None; + let mut compaction_group_config: Option> = None; // Add new table - let (new_table_ids, new_compaction_group, compaction_group_manager_txn) = + for new_table_fragment_info in new_table_fragment_infos { match new_table_fragment_info { NewTableFragmentInfo::Normal { mv_table_id, internal_table_ids, } => { - let mut new_table_ids = HashMap::new(); on_handle_add_new_table( state_table_info, &internal_table_ids, @@ -148,24 +151,40 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - (new_table_ids, None, None) } NewTableFragmentInfo::NewCompactionGroup { table_ids } => { - let compaction_group_manager_guard = - self.compaction_group_manager.write().await; - let compaction_group_config = - compaction_group_manager_guard.default_compaction_config(); - let mut compaction_group_manager = - CompactionGroupManager::start_owned_compaction_groups_txn( - compaction_group_manager_guard, - ); - let mut new_table_ids = HashMap::new(); + let (compaction_group_manager, compaction_group_config) = + if let Some(compaction_group_manager) = &mut compaction_group_manager_txn { + ( + compaction_group_manager, + (*compaction_group_config + .as_ref() + .expect("must be set with compaction_group_manager_txn")) + .clone(), + ) + } else { + let compaction_group_manager_guard = + self.compaction_group_manager.write().await; + let new_compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + compaction_group_config = Some(new_compaction_group_config.clone()); + ( + compaction_group_manager_txn.insert( + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ), + ), + new_compaction_group_config, + ) + }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + new_compaction_groups + .insert(new_compaction_group_id, compaction_group_config.clone()); compaction_group_manager.insert( new_compaction_group_id, CompactionGroup { group_id: new_compaction_group_id, - compaction_config: compaction_group_config.clone(), + compaction_config: compaction_group_config, }, ); @@ -176,14 +195,9 @@ impl HummockManager { &mut table_compaction_group_mapping, &mut new_table_ids, )?; - ( - new_table_ids, - Some((new_compaction_group_id, (*compaction_group_config).clone())), - Some(compaction_group_manager), - ) } - NewTableFragmentInfo::None => (HashMap::new(), None, None), - }; + } + } let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) @@ -192,9 +206,8 @@ impl HummockManager { let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); let time_travel_delta = version.pre_commit_epoch( - committed_epoch, &tables_to_commit, - new_compaction_group, + new_compaction_groups, commit_sstables, &new_table_ids, new_table_watermarks, @@ -251,9 +264,14 @@ impl HummockManager { .values() .map(|g| (g.group_id, g.parent_group_id)) .collect(); - let time_travel_tables_to_commit = table_compaction_group_mapping - .iter() - .filter(|(table_id, _)| tables_to_commit.contains(table_id)); + let time_travel_tables_to_commit = + table_compaction_group_mapping + .iter() + .filter_map(|(table_id, cg_id)| { + tables_to_commit + .get(table_id) + .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) + }); let mut txn = self.env.meta_store_ref().conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( @@ -263,7 +281,6 @@ impl HummockManager { &group_parents, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, - committed_epoch, ) .await?; commit_multi_var_with_provided_txn!( diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index b76bd47c49b74..0b66b1dded8d0 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, + HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask}; @@ -189,8 +189,7 @@ impl HummockManager { pub async fn commit_epoch_sanity_check( &self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, + tables_to_commit: &HashMap, sstables: &[LocalSstableInfo], sst_to_context: &HashMap, current_version: &HummockVersion, @@ -216,9 +215,9 @@ impl HummockManager { } // sanity check on monotonically increasing table committed epoch - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if let Some(info) = current_version.state_table_info.info().get(table_id) { - if committed_epoch <= info.committed_epoch { + if *committed_epoch <= info.committed_epoch { return Err(anyhow::anyhow!( "table {} Epoch {} <= committed_epoch {}", table_id, @@ -265,7 +264,6 @@ impl HummockManager { .send_event(ResponseEvent::ValidationTask(ValidationTask { sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(), sst_id_to_worker_id: sst_to_context.clone(), - epoch: committed_epoch, })) .is_err() { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index c8df0c93b85ec..4b15309abfb98 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -359,8 +359,7 @@ impl HummockManager { delta: HummockVersionDelta, group_parents: &HashMap, skip_sst_ids: &HashSet, - tables_to_commit: impl Iterator, - committed_epoch: u64, + tables_to_commit: impl Iterator, ) -> Result>> { let select_groups = group_parents .iter() @@ -397,7 +396,7 @@ impl HummockManager { Ok(count) } - for (table_id, cg_id) in tables_to_commit { + for (table_id, cg_id, committed_epoch) in tables_to_commit { if !select_groups.contains(cg_id) { continue; } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..a8d3645d29037 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -23,9 +24,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{ - CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, -}; +use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, StateTableInfoDelta, @@ -113,9 +112,8 @@ impl<'a> HummockVersionTransaction<'a> { /// Returns a duplicate delta, used by time travel. pub(super) fn pre_commit_epoch( &mut self, - committed_epoch: HummockEpoch, - tables_to_commit: &HashSet, - new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, + tables_to_commit: &HashMap, + new_compaction_groups: HashMap>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, @@ -125,7 +123,7 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - if let Some((compaction_group_id, compaction_group_config)) = new_compaction_group { + for (compaction_group_id, compaction_group_config) in new_compaction_groups { { let group_deltas = &mut new_version_delta .group_deltas @@ -135,7 +133,7 @@ impl<'a> HummockVersionTransaction<'a> { #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some(compaction_group_config.clone()), + group_config: Some((*compaction_group_config).clone()), group_id: compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, @@ -173,6 +171,7 @@ impl<'a> HummockVersionTransaction<'a> { "newly added table exists previously: {:?}", table_id ); + let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit"); delta.state_table_info_delta.insert( *table_id, StateTableInfoDelta { @@ -182,7 +181,7 @@ impl<'a> HummockVersionTransaction<'a> { ); } - for table_id in tables_to_commit { + for (table_id, committed_epoch) in tables_to_commit { if new_table_ids.contains_key(table_id) { continue; } @@ -194,7 +193,7 @@ impl<'a> HummockVersionTransaction<'a> { .insert( *table_id, StateTableInfoDelta { - committed_epoch, + committed_epoch: *committed_epoch, compaction_group_id: info.compaction_group_id, } ) diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 54b26fa20a665..805db163587a0 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -171,20 +171,20 @@ impl HummockMetaClient for MockHummockMetaClient { .chain(table_ids.iter().cloned()) .collect::>(); - let new_table_fragment_info = if commit_table_ids + let new_table_fragment_infos = if commit_table_ids .iter() .all(|table_id| table_ids.contains(table_id)) { - NewTableFragmentInfo::None + vec![] } else { - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: commit_table_ids .iter() .cloned() .map(TableId::from) .collect_vec(), - } + }] }; let sst_to_context = sync_result @@ -215,13 +215,12 @@ impl HummockMetaClient for MockHummockMetaClient { sstables: sync_result.uncommitted_ssts, new_table_watermarks: new_table_watermark, sst_to_context, - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: table_change_log, - committed_epoch: epoch, tables_to_commit: commit_table_ids .iter() .cloned() - .map(TableId::from) + .map(|table_id| (TableId::new(table_id), epoch)) .collect(), }) .await diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 88dc47d1f30e8..69d9dc21a075a 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -31,6 +31,7 @@ #![feature(const_option)] #![feature(anonymous_lifetime_in_impl_trait)] #![feature(duration_millis_float)] +#![feature(option_get_or_insert_default)] pub mod backup_restore; pub mod barrier; diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index 162895a38ac91..82a9b1904d5af 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -326,7 +326,6 @@ impl From<&CompactTask> for PbCompactTask { pub struct ValidationTask { pub sst_infos: Vec, pub sst_id_to_worker_id: HashMap, - pub epoch: u64, } impl From for ValidationTask { @@ -338,7 +337,6 @@ impl From for ValidationTask { .map(SstableInfo::from) .collect_vec(), sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.clone(), - epoch: pb_validation_task.epoch, } } } @@ -352,7 +350,6 @@ impl From for PbValidationTask { .map(|sst| sst.into()) .collect_vec(), sst_id_to_worker_id: validation_task.sst_id_to_worker_id.clone(), - epoch: validation_task.epoch, } } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7c70721f04d82..4e6ab26a539c6 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2585,9 +2585,12 @@ async fn test_commit_multi_epoch() { let initial_epoch = INVALID_EPOCH; let commit_epoch = - |epoch, sst: SstableInfo, new_table_fragment_info, tables_to_commit: &[TableId]| { + |epoch, sst: SstableInfo, new_table_fragment_infos, tables_to_commit: &[TableId]| { let manager = &test_env.manager; - let tables_to_commit = tables_to_commit.iter().cloned().collect(); + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); async move { manager .commit_epoch(CommitEpochInfo { @@ -2610,9 +2613,8 @@ async fn test_commit_multi_epoch() { sst_info: sst, created_at: u64::MAX, }], - new_table_fragment_info, + new_table_fragment_infos, change_log_delta: Default::default(), - committed_epoch: epoch, tables_to_commit, }) .await @@ -2633,10 +2635,10 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst1_epoch1.clone(), - NewTableFragmentInfo::Normal { + vec![NewTableFragmentInfo::Normal { mv_table_id: None, internal_table_ids: vec![existing_table_id], - }, + }], &[existing_table_id], ) .await; @@ -2678,13 +2680,7 @@ async fn test_commit_multi_epoch() { let epoch2 = epoch1.next_epoch(); - commit_epoch( - epoch2, - sst1_epoch2.clone(), - NewTableFragmentInfo::None, - &[existing_table_id], - ) - .await; + commit_epoch(epoch2, sst1_epoch2.clone(), vec![], &[existing_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2727,9 +2723,9 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst2_epoch1.clone(), - NewTableFragmentInfo::NewCompactionGroup { + vec![NewTableFragmentInfo::NewCompactionGroup { table_ids: HashSet::from_iter([new_table_id]), - }, + }], &[new_table_id], ) .await; @@ -2764,13 +2760,7 @@ async fn test_commit_multi_epoch() { ..Default::default() }; - commit_epoch( - epoch2, - sst2_epoch2.clone(), - NewTableFragmentInfo::None, - &[new_table_id], - ) - .await; + commit_epoch(epoch2, sst2_epoch2.clone(), vec![], &[new_table_id]).await; { let version = test_env.manager.get_current_version().await; @@ -2804,7 +2794,7 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch3, sst_epoch3.clone(), - NewTableFragmentInfo::None, + vec![], &[existing_table_id, new_table_id], ) .await; diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d3e552a76213f..6734235225654 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; @@ -29,7 +29,7 @@ use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH, MAX_EP use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo, SyncResult}; use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use risingwave_meta::hummock::CommitEpochInfo; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::iterator::change_log::test_utils::{ apply_test_log_data, gen_test_data, @@ -1384,10 +1384,9 @@ async fn test_replicated_local_hummock_storage() { sstables: vec![], new_table_watermarks: Default::default(), sst_to_context: Default::default(), - new_table_fragment_info: NewTableFragmentInfo::None, + new_table_fragment_infos: vec![], change_log_delta: Default::default(), - committed_epoch: epoch0, - tables_to_commit: HashSet::from_iter([TEST_TABLE_ID]), + tables_to_commit: HashMap::from_iter([(TEST_TABLE_ID, epoch0)]), }) .await .unwrap(); diff --git a/src/storage/src/hummock/validator.rs b/src/storage/src/hummock/validator.rs index cc95b7089b664..2c0efbb3ca934 100644 --- a/src/storage/src/hummock/validator.rs +++ b/src/storage/src/hummock/validator.rs @@ -38,12 +38,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) .sst_id_to_worker_id .get(&sst.object_id) .expect("valid worker_id"); - tracing::debug!( - "Validating SST {} from worker {}, epoch {}", - sst.object_id, - worker_id, - task.epoch - ); + tracing::debug!("Validating SST {} from worker {}", sst.object_id, worker_id,); let holder = match sstable_store.sstable(&sst, unused.borrow_mut()).await { Ok(holder) => holder, Err(_err) => { @@ -100,12 +95,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) break; } } - tracing::debug!( - "Validated {} keys for SST {}, epoch {}", - key_counts, - sst.object_id, - task.epoch - ); + tracing::debug!("Validated {} keys for SST {}", key_counts, sst.object_id,); iter.collect_local_statistic(&mut unused); unused.ignore(); }