From 892d86fa1bc164e9431abea7b23e3fd99b1929a0 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:13:23 +0800 Subject: [PATCH] refactor(meta): add system parameter for time travel (#17866) --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + proto/meta.proto | 1 + src/common/src/config.rs | 16 ------------ src/common/src/system_param/mod.rs | 2 ++ src/common/src/system_param/reader.rs | 6 +++++ src/config/ci-time-travel.toml | 7 +++--- src/config/docs.md | 3 +-- src/config/example.toml | 3 +-- src/meta/node/src/lib.rs | 2 -- src/meta/src/hummock/manager/checkpoint.rs | 4 +-- src/meta/src/hummock/manager/commit_epoch.rs | 13 +++++++--- src/meta/src/hummock/manager/mod.rs | 4 +-- src/meta/src/hummock/manager/time_travel.rs | 17 +++++++++---- src/meta/src/hummock/manager/versioning.rs | 2 ++ src/meta/src/hummock/vacuum.rs | 26 ++++++++++++-------- src/meta/src/manager/env.rs | 4 --- src/meta/src/manager/system_param/mod.rs | 17 ++++++++++++- 17 files changed, 76 insertions(+), 52 deletions(-) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 5744cd0362cbf..081a30cd90cb1 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -16,6 +16,7 @@ postmaster enable_tracing postmaster license_key postmaster max_concurrent_creating_streaming_jobs postmaster pause_on_next_bootstrap +postmaster time_travel_retention_ms user application_name user backfill_rate_limit user background_ddl diff --git a/proto/meta.proto b/proto/meta.proto index 2023ad432a438..2c8449421994b 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -630,6 +630,7 @@ message SystemParams { optional bool enable_tracing = 15; optional bool use_new_object_prefix_strategy = 16; optional string license_key = 17; + optional uint64 time_travel_retention_ms = 18; } message GetSystemParamsRequest {} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index e69add1218cb6..eeb393edb4ab9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -223,14 +223,6 @@ pub struct MetaConfig { #[serde(default = "default::meta::enable_hummock_data_archive")] pub enable_hummock_data_archive: bool, - /// If enabled, time travel query is available. - #[serde(default = "default::meta::enable_hummock_time_travel")] - pub enable_hummock_time_travel: bool, - - /// The data retention period for time travel. - #[serde(default = "default::meta::hummock_time_travel_retention_ms")] - pub hummock_time_travel_retention_ms: u64, - /// The interval at which a Hummock version snapshot is taken for time travel. /// /// Larger value indicates less storage overhead but worse query performance. @@ -1346,14 +1338,6 @@ pub mod default { false } - pub fn enable_hummock_time_travel() -> bool { - false - } - - pub fn hummock_time_travel_retention_ms() -> u64 { - 24 * 3600 * 1000 - } - pub fn hummock_time_travel_snapshot_interval() -> u64 { 100 } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index aa6f207ead1a9..f4f9e931fa1a0 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -99,6 +99,7 @@ macro_rules! for_all_params { { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, { use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", }, { license_key, String, Some(default_license_key()), true, "The license key to activate enterprise features.", }, + { time_travel_retention_ms, u64, Some(0_u64), true, "The data retention period for time travel, where 0 indicates that it's disabled.", }, } }; } @@ -458,6 +459,7 @@ mod tests { (ENABLE_TRACING_KEY, "true"), (USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"), (LICENSE_KEY_KEY, "foo"), + (TIME_TRAVEL_RETENTION_MS_KEY, "0"), ("a_deprecated_param", "foo"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 2ef13ff3f5509..505e36693f1d5 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -172,4 +172,10 @@ where fn license_key(&self) -> &str { self.inner().license_key.as_deref().unwrap_or_default() } + + fn time_travel_retention_ms(&self) -> u64 { + self.inner() + .time_travel_retention_ms + .unwrap_or_else(default::time_travel_retention_ms) + } } diff --git a/src/config/ci-time-travel.toml b/src/config/ci-time-travel.toml index 3f17e2aaf371f..a5aadc196f9b5 100644 --- a/src/config/ci-time-travel.toml +++ b/src/config/ci-time-travel.toml @@ -1,5 +1,6 @@ [meta] -enable_hummock_time_travel = true -hummock_time_travel_retention_ms = 300000 hummock_time_travel_snapshot_interval = 30 -min_sst_retention_time_sec = 1 \ No newline at end of file +min_sst_retention_time_sec = 1 + +[system] +time_travel_retention_ms = 300000 diff --git a/src/config/docs.md b/src/config/docs.md index 230d43648e3eb..47905d71e5e0c 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -36,11 +36,9 @@ This page is automatically generated by `./risedev generate-example-config` | enable_compaction_deterministic | Whether to enable deterministic compaction scheduling, which will disable all auto scheduling of compaction tasks. Should only be used in e2e tests. | false | | enable_dropped_column_reclaim | Whether compactor should rewrite row to remove dropped column. | false | | enable_hummock_data_archive | If enabled, `SSTable` object file and version delta will be retained. `SSTable` object file need to be deleted via full GC. version delta need to be manually deleted. | false | -| enable_hummock_time_travel | If enabled, time travel query is available. | false | | event_log_channel_max_size | Keeps the latest N events per channel. | 10 | | event_log_enabled | | true | | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | -| hummock_time_travel_retention_ms | The data retention period for time travel. | 86400000 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | | hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | @@ -171,4 +169,5 @@ This page is automatically generated by `./risedev generate-example-config` | pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false | | sstable_size_mb | Target size of the Sstable. | 256 | | state_store | URL for the state store | | +| time_travel_retention_ms | The data retention period for time travel, where 0 indicates that it's disabled. | 0 | | use_new_object_prefix_strategy | Whether to split object prefix. | | diff --git a/src/config/example.toml b/src/config/example.toml index d67eabe568b2c..866a56017982e 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -22,8 +22,6 @@ vacuum_interval_sec = 30 vacuum_spin_interval_ms = 200 hummock_version_checkpoint_interval_sec = 30 enable_hummock_data_archive = false -enable_hummock_time_travel = false -hummock_time_travel_retention_ms = 86400000 hummock_time_travel_snapshot_interval = 100 min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 60 @@ -244,3 +242,4 @@ max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false enable_tracing = false license_key = "" +time_travel_retention_ms = 0 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 0fd7ee94d9fc7..65e730166aed8 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -374,8 +374,6 @@ pub fn start( .meta .hummock_version_checkpoint_interval_sec, enable_hummock_data_archive: config.meta.enable_hummock_data_archive, - enable_hummock_time_travel: config.meta.enable_hummock_time_travel, - hummock_time_travel_retention_ms: config.meta.hummock_time_travel_retention_ms, hummock_time_travel_snapshot_interval: config .meta .hummock_time_travel_snapshot_interval, diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index e901c09f7831f..bf63d3eed9690 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -206,7 +206,7 @@ impl HummockManager { }); } // Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used. - if self.env.opts.enable_hummock_data_archive || self.env.opts.enable_hummock_time_travel { + if self.env.opts.enable_hummock_data_archive || self.time_travel_enabled().await { let context_info = self.context_info.read().await; let min_pinned_version_id = context_info.min_pinned_version_id(); stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); @@ -234,7 +234,7 @@ impl HummockManager { assert!(new_checkpoint.version.id > versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; // Not delete stale objects when archive or time travel is enabled - if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { + if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 2e8535e97de7c..025c7f503e89a 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -32,7 +32,6 @@ use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::time_travel::require_sql_meta_store_err; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; @@ -155,6 +154,13 @@ impl HummockManager { ); } + let previous_time_travel_toggle_check = versioning.time_travel_toggle_check; + versioning.time_travel_toggle_check = self.time_travel_enabled().await; + if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check { + // Take a snapshot for the first commit epoch after enabling time travel. + versioning.mark_next_time_travel_version_snapshot(); + } + let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, @@ -251,7 +257,9 @@ impl HummockManager { ); table_metrics.inc_write_throughput(stats_value as u64); } - if self.env.opts.enable_hummock_time_travel { + if versioning.time_travel_toggle_check + && let Some(sql_store) = self.sql_store() + { let mut time_travel_version = None; if versioning.time_travel_snapshot_interval_counter >= self.env.opts.hummock_time_travel_snapshot_interval @@ -269,7 +277,6 @@ impl HummockManager { .values() .map(|g| (g.group_id, g.parent_group_id)) .collect(); - let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; let mut txn = sql_store.conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 3bb4d5152adb9..82ce16cf10346 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -61,7 +61,7 @@ pub(crate) mod checkpoint; mod commit_epoch; mod compaction; pub mod sequence; -mod time_travel; +pub mod time_travel; mod timer_task; mod transaction; mod utils; @@ -471,7 +471,7 @@ impl HummockManager { self.delete_object_tracker.clear(); // Not delete stale objects when archive or time travel is enabled - if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { + if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 6ac687ad39c02..e946969324b1a 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -53,10 +54,16 @@ impl HummockManager { } } + pub(crate) async fn time_travel_enabled(&self) -> bool { + self.env + .system_params_reader() + .await + .time_travel_retention_ms() + > 0 + && self.sql_store().is_some() + } + pub(crate) async fn init_time_travel_state(&self) -> Result<()> { - if self.env.opts.enable_hummock_time_travel && self.sql_store().is_none() { - return Err(require_sql_meta_store_err()); - } let Some(sql_store) = self.sql_store() else { return Ok(()); }; @@ -503,6 +510,6 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId } -pub(crate) fn require_sql_meta_store_err() -> Error { - Error::TimeTravel(anyhow!("time travel requires SQL meta store")) +pub fn require_sql_meta_store_err() -> Error { + Error::TimeTravel(anyhow!("require SQL meta store")) } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 4777a203bb3cd..781c928ca4b05 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -60,6 +60,8 @@ pub struct Versioning { pub time_travel_snapshot_interval_counter: u64, /// Used to avoid the attempts to rewrite the same SST to meta store pub last_time_travel_snapshot_sst_ids: HashSet, + /// Whether time travel is enabled during last commit epoch. + pub time_travel_toggle_check: bool, // Persistent states below pub hummock_version_deltas: BTreeMap, diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 08f227dd31843..d747651b86d43 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; +use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; @@ -77,16 +78,21 @@ impl VacuumManager { break; } } - if self.env.opts.enable_hummock_time_travel { - let current_epoch_time = Epoch::now().physical_time(); - let epoch_watermark = Epoch::from_physical_time( - current_epoch_time.saturating_sub(self.env.opts.hummock_time_travel_retention_ms), - ) - .0; - self.hummock_manager - .truncate_time_travel_metadata(epoch_watermark) - .await?; - } + + let current_epoch_time = Epoch::now().physical_time(); + let epoch_watermark = Epoch::from_physical_time( + current_epoch_time.saturating_sub( + self.env + .system_params_reader() + .await + .time_travel_retention_ms(), + ), + ) + .0; + self.hummock_manager + .truncate_time_travel_metadata(epoch_watermark) + .await?; + Ok(total_deleted) } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 5bb12eec9c7bf..22c7df751ec78 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -170,8 +170,6 @@ pub struct MetaOpts { /// Interval of hummock version checkpoint. pub hummock_version_checkpoint_interval_sec: u64, pub enable_hummock_data_archive: bool, - pub enable_hummock_time_travel: bool, - pub hummock_time_travel_retention_ms: u64, pub hummock_time_travel_snapshot_interval: u64, /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in @@ -310,8 +308,6 @@ impl MetaOpts { vacuum_spin_interval_ms: 0, hummock_version_checkpoint_interval_sec: 30, enable_hummock_data_archive: false, - enable_hummock_time_travel: false, - hummock_time_travel_retention_ms: 0, hummock_time_travel_snapshot_interval: 0, min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 0030d3c2d8f88..dbc4a743fa72c 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -21,7 +21,9 @@ use std::time::Duration; use anyhow::anyhow; use risingwave_common::system_param::common::CommonHandler; use risingwave_common::system_param::reader::SystemParamsReader; -use risingwave_common::system_param::{check_missing_params, set_system_param}; +use risingwave_common::system_param::{ + check_missing_params, set_system_param, TIME_TRAVEL_RETENTION_MS_KEY, +}; use risingwave_common::{for_all_params, key_of}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SystemParams; @@ -32,6 +34,7 @@ use tracing::info; use self::model::SystemParamsModel; use super::NotificationManagerRef; +use crate::hummock::time_travel::require_sql_meta_store_err; use crate::model::{InMemValTransaction, ValTransaction, VarTransaction}; use crate::storage::{MetaStore, MetaStoreRef, Transaction}; use crate::{MetaError, MetaResult}; @@ -66,6 +69,14 @@ impl SystemParamsManager { )); }; + if params + .time_travel_retention_ms + .map(|r| r > 0) + .unwrap_or(false) + { + return Err(require_sql_meta_store_err().into()); + } + info!("system parameters: {:?}", params); check_missing_params(¶ms).map_err(|e| anyhow!(e))?; @@ -86,6 +97,10 @@ impl SystemParamsManager { } pub async fn set_param(&self, name: &str, value: Option) -> MetaResult { + if name == TIME_TRAVEL_RETENTION_MS_KEY { + return Err(require_sql_meta_store_err().into()); + } + let mut params_guard = self.params.write().await; let params = params_guard.deref_mut(); let mut mem_txn = VarTransaction::new(params);