diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 93e5d5be1dfb8..5875feabf6db0 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -44,9 +44,9 @@ use crate::hummock::metrics_utils::{ }; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; +use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot; use crate::hummock::{ - commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, - HummockManager, + commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager, }; pub enum NewTableFragmentInfo { @@ -109,13 +109,6 @@ impl HummockManager { ); } - let previous_time_travel_toggle_check = versioning.time_travel_toggle_check; - versioning.time_travel_toggle_check = self.time_travel_enabled().await; - if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check { - // Take a snapshot for the first commit epoch after enabling time travel. - versioning.mark_next_time_travel_version_snapshot(); - } - let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, @@ -214,6 +207,10 @@ impl HummockManager { new_table_watermarks, change_log_delta, ); + if should_mark_next_time_travel_version_snapshot(&time_travel_delta) { + // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed. + versioning.time_travel_snapshot_interval_counter = u64::MAX; + } // Apply stats changes. let mut version_stats = HummockVersionStatsTransaction::new( @@ -247,59 +244,50 @@ impl HummockManager { ); table_metrics.inc_write_throughput(stats_value as u64); } - if versioning.time_travel_toggle_check { - let mut time_travel_version = None; - if versioning.time_travel_snapshot_interval_counter - >= self.env.opts.hummock_time_travel_snapshot_interval - { - versioning.time_travel_snapshot_interval_counter = 0; - time_travel_version = Some(version.latest_version()); - } else { - versioning.time_travel_snapshot_interval_counter = versioning - .time_travel_snapshot_interval_counter - .saturating_add(1); - } - let group_parents = version - .latest_version() - .levels - .values() - .map(|g| (g.group_id, g.parent_group_id)) - .collect(); - let time_travel_tables_to_commit = - table_compaction_group_mapping - .iter() - .filter_map(|(table_id, cg_id)| { - tables_to_commit - .get(table_id) - .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) - }); - let mut txn = self.env.meta_store_ref().conn.begin().await?; - let version_snapshot_sst_ids = self - .write_time_travel_metadata( - &txn, - time_travel_version, - time_travel_delta, - &group_parents, - &versioning.last_time_travel_snapshot_sst_ids, - time_travel_tables_to_commit, - ) - .await?; - commit_multi_var_with_provided_txn!( - txn, - version, - version_stats, - compaction_group_manager_txn - )?; - if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { - versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; - } + let mut time_travel_version = None; + if versioning.time_travel_snapshot_interval_counter + >= self.env.opts.hummock_time_travel_snapshot_interval + { + versioning.time_travel_snapshot_interval_counter = 0; + time_travel_version = Some(version.latest_version()); } else { - commit_multi_var!( - self.meta_store_ref(), - version, - version_stats, - compaction_group_manager_txn - )?; + versioning.time_travel_snapshot_interval_counter = versioning + .time_travel_snapshot_interval_counter + .saturating_add(1); + } + let group_parents = version + .latest_version() + .levels + .values() + .map(|g| (g.group_id, g.parent_group_id)) + .collect(); + let time_travel_tables_to_commit = + table_compaction_group_mapping + .iter() + .filter_map(|(table_id, cg_id)| { + tables_to_commit + .get(table_id) + .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) + }); + let mut txn = self.env.meta_store_ref().conn.begin().await?; + let version_snapshot_sst_ids = self + .write_time_travel_metadata( + &txn, + time_travel_version, + time_travel_delta, + &group_parents, + &versioning.last_time_travel_snapshot_sst_ids, + time_travel_tables_to_commit, + ) + .await?; + commit_multi_var_with_provided_txn!( + txn, + version, + version_stats, + compaction_group_manager_txn + )?; + if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { + versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; } for compaction_group_id in &modified_compaction_groups { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index fab79fe01f9a0..bd26196e619d4 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -17,14 +17,13 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, }; -use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, }; @@ -45,14 +44,6 @@ use crate::hummock::HummockManager; /// Time travel. impl HummockManager { - pub(crate) async fn time_travel_enabled(&self) -> bool { - self.env - .system_params_reader() - .await - .time_travel_retention_ms() - > 0 - } - pub(crate) async fn init_time_travel_state(&self) -> Result<()> { let sql_store = self.env.meta_store_ref(); let mut guard = self.versioning.write().await; @@ -473,6 +464,11 @@ fn replay_archive( let mut last_version = HummockVersion::from_persisted_protobuf(&version); for d in deltas { let d = HummockVersionDelta::from_persisted_protobuf(&d); + debug_assert!( + !should_mark_next_time_travel_version_snapshot(&d), + "unexpect time travel delta {:?}", + d + ); // Need to work around the assertion in `apply_version_delta`. // Because compaction deltas are not included in time travel archive. while last_version.id < d.prev_id { @@ -506,3 +502,13 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { pub fn require_sql_meta_store_err() -> Error { Error::TimeTravel(anyhow!("require SQL meta store")) } + +/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created. +pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool { + delta.group_deltas.iter().any(|(_, deltas)| { + deltas.group_deltas.iter().any(|d| match d { + GroupDeltaCommon::NewL0SubLevel(_) => false, + _ => true, + }) + }) +} diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 7fa93f72d02b8..fd516a2ba3e1e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -55,8 +55,6 @@ pub struct Versioning { pub time_travel_snapshot_interval_counter: u64, /// Used to avoid the attempts to rewrite the same SST to meta store pub last_time_travel_snapshot_sst_ids: HashSet, - /// Whether time travel is enabled during last commit epoch. - pub time_travel_toggle_check: bool, // Persistent states below pub hummock_version_deltas: BTreeMap,