Skip to content

Commit

Permalink
refactor(meta): add system parameter for time travel (#17866)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jul 31, 2024
1 parent 8984ae2 commit 892d86f
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 52 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ postmaster enable_tracing
postmaster license_key
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
postmaster time_travel_retention_ms
user application_name
user backfill_rate_limit
user background_ddl
Expand Down
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ message SystemParams {
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
optional string license_key = 17;
optional uint64 time_travel_retention_ms = 18;
}

message GetSystemParamsRequest {}
Expand Down
16 changes: 0 additions & 16 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,6 @@ pub struct MetaConfig {
#[serde(default = "default::meta::enable_hummock_data_archive")]
pub enable_hummock_data_archive: bool,

/// If enabled, time travel query is available.
#[serde(default = "default::meta::enable_hummock_time_travel")]
pub enable_hummock_time_travel: bool,

/// The data retention period for time travel.
#[serde(default = "default::meta::hummock_time_travel_retention_ms")]
pub hummock_time_travel_retention_ms: u64,

/// The interval at which a Hummock version snapshot is taken for time travel.
///
/// Larger value indicates less storage overhead but worse query performance.
Expand Down Expand Up @@ -1346,14 +1338,6 @@ pub mod default {
false
}

pub fn enable_hummock_time_travel() -> bool {
false
}

pub fn hummock_time_travel_retention_ms() -> u64 {
24 * 3600 * 1000
}

pub fn hummock_time_travel_snapshot_interval() -> u64 {
100
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ macro_rules! for_all_params {
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
{ license_key, String, Some(default_license_key()), true, "The license key to activate enterprise features.", },
{ time_travel_retention_ms, u64, Some(0_u64), true, "The data retention period for time travel, where 0 indicates that it's disabled.", },
}
};
}
Expand Down Expand Up @@ -458,6 +459,7 @@ mod tests {
(ENABLE_TRACING_KEY, "true"),
(USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
(LICENSE_KEY_KEY, "foo"),
(TIME_TRAVEL_RETENTION_MS_KEY, "0"),
("a_deprecated_param", "foo"),
];

Expand Down
6 changes: 6 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ where
fn license_key(&self) -> &str {
self.inner().license_key.as_deref().unwrap_or_default()
}

fn time_travel_retention_ms(&self) -> u64 {
self.inner()
.time_travel_retention_ms
.unwrap_or_else(default::time_travel_retention_ms)
}
}
7 changes: 4 additions & 3 deletions src/config/ci-time-travel.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[meta]
enable_hummock_time_travel = true
hummock_time_travel_retention_ms = 300000
hummock_time_travel_snapshot_interval = 30
min_sst_retention_time_sec = 1
min_sst_retention_time_sec = 1

[system]
time_travel_retention_ms = 300000
3 changes: 1 addition & 2 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ This page is automatically generated by `./risedev generate-example-config`
| enable_compaction_deterministic | Whether to enable deterministic compaction scheduling, which will disable all auto scheduling of compaction tasks. Should only be used in e2e tests. | false |
| enable_dropped_column_reclaim | Whether compactor should rewrite row to remove dropped column. | false |
| enable_hummock_data_archive | If enabled, `SSTable` object file and version delta will be retained. `SSTable` object file need to be deleted via full GC. version delta need to be manually deleted. | false |
| enable_hummock_time_travel | If enabled, time travel query is available. | false |
| event_log_channel_max_size | Keeps the latest N events per channel. | 10 |
| event_log_enabled | | true |
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| hummock_time_travel_retention_ms | The data retention period for time travel. | 86400000 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
Expand Down Expand Up @@ -171,4 +169,5 @@ This page is automatically generated by `./risedev generate-example-config`
| pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false |
| sstable_size_mb | Target size of the Sstable. | 256 |
| state_store | URL for the state store | |
| time_travel_retention_ms | The data retention period for time travel, where 0 indicates that it's disabled. | 0 |
| use_new_object_prefix_strategy | Whether to split object prefix. | |
3 changes: 1 addition & 2 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ vacuum_interval_sec = 30
vacuum_spin_interval_ms = 200
hummock_version_checkpoint_interval_sec = 30
enable_hummock_data_archive = false
enable_hummock_time_travel = false
hummock_time_travel_retention_ms = 86400000
hummock_time_travel_snapshot_interval = 100
min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 60
Expand Down Expand Up @@ -244,3 +242,4 @@ max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
enable_tracing = false
license_key = ""
time_travel_retention_ms = 0
2 changes: 0 additions & 2 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,6 @@ pub fn start(
.meta
.hummock_version_checkpoint_interval_sec,
enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
enable_hummock_time_travel: config.meta.enable_hummock_time_travel,
hummock_time_travel_retention_ms: config.meta.hummock_time_travel_retention_ms,
hummock_time_travel_snapshot_interval: config
.meta
.hummock_time_travel_snapshot_interval,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl HummockManager {
});
}
// Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used.
if self.env.opts.enable_hummock_data_archive || self.env.opts.enable_hummock_time_travel {
if self.env.opts.enable_hummock_data_archive || self.time_travel_enabled().await {
let context_info = self.context_info.read().await;
let min_pinned_version_id = context_info.min_pinned_version_id();
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
Expand Down Expand Up @@ -234,7 +234,7 @@ impl HummockManager {
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
// Not delete stale objects when archive or time travel is enabled
if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel {
if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await {
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
}

Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_pb::hummock::HummockSnapshot;
use sea_orm::TransactionTrait;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::time_travel::require_sql_meta_store_err;
use crate::hummock::manager::transaction::{
HummockVersionStatsTransaction, HummockVersionTransaction,
};
Expand Down Expand Up @@ -155,6 +154,13 @@ impl HummockManager {
);
}

let previous_time_travel_toggle_check = versioning.time_travel_toggle_check;
versioning.time_travel_toggle_check = self.time_travel_enabled().await;
if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check {
// Take a snapshot for the first commit epoch after enabling time travel.
versioning.mark_next_time_travel_version_snapshot();
}

let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
Expand Down Expand Up @@ -251,7 +257,9 @@ impl HummockManager {
);
table_metrics.inc_write_throughput(stats_value as u64);
}
if self.env.opts.enable_hummock_time_travel {
if versioning.time_travel_toggle_check
&& let Some(sql_store) = self.sql_store()
{
let mut time_travel_version = None;
if versioning.time_travel_snapshot_interval_counter
>= self.env.opts.hummock_time_travel_snapshot_interval
Expand All @@ -269,7 +277,6 @@ impl HummockManager {
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?;
let mut txn = sql_store.conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) mod checkpoint;
mod commit_epoch;
mod compaction;
pub mod sequence;
mod time_travel;
pub mod time_travel;
mod timer_task;
mod transaction;
mod utils;
Expand Down Expand Up @@ -471,7 +471,7 @@ impl HummockManager {

self.delete_object_tracker.clear();
// Not delete stale objects when archive or time travel is enabled
if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel {
if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await {
versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker);
}

Expand Down
17 changes: 12 additions & 5 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, VecDeque};

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
Expand Down Expand Up @@ -53,10 +54,16 @@ impl HummockManager {
}
}

pub(crate) async fn time_travel_enabled(&self) -> bool {
self.env
.system_params_reader()
.await
.time_travel_retention_ms()
> 0
&& self.sql_store().is_some()
}

pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
if self.env.opts.enable_hummock_time_travel && self.sql_store().is_none() {
return Err(require_sql_meta_store_err());
}
let Some(sql_store) = self.sql_store() else {
return Ok(());
};
Expand Down Expand Up @@ -503,6 +510,6 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId
}

pub(crate) fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("time travel requires SQL meta store"))
pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct Versioning {
pub time_travel_snapshot_interval_counter: u64,
/// Used to avoid the attempts to rewrite the same SST to meta store
pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
/// Whether time travel is enabled during last commit epoch.
pub time_travel_toggle_check: bool,

// Persistent states below
pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
Expand Down
26 changes: 16 additions & 10 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
Expand Down Expand Up @@ -77,16 +78,21 @@ impl VacuumManager {
break;
}
}
if self.env.opts.enable_hummock_time_travel {
let current_epoch_time = Epoch::now().physical_time();
let epoch_watermark = Epoch::from_physical_time(
current_epoch_time.saturating_sub(self.env.opts.hummock_time_travel_retention_ms),
)
.0;
self.hummock_manager
.truncate_time_travel_metadata(epoch_watermark)
.await?;
}

let current_epoch_time = Epoch::now().physical_time();
let epoch_watermark = Epoch::from_physical_time(
current_epoch_time.saturating_sub(
self.env
.system_params_reader()
.await
.time_travel_retention_ms(),
),
)
.0;
self.hummock_manager
.truncate_time_travel_metadata(epoch_watermark)
.await?;

Ok(total_deleted)
}

Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ pub struct MetaOpts {
/// Interval of hummock version checkpoint.
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
pub enable_hummock_time_travel: bool,
pub hummock_time_travel_retention_ms: u64,
pub hummock_time_travel_snapshot_interval: u64,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
Expand Down Expand Up @@ -310,8 +308,6 @@ impl MetaOpts {
vacuum_spin_interval_ms: 0,
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
enable_hummock_time_travel: false,
hummock_time_travel_retention_ms: 0,
hummock_time_travel_snapshot_interval: 0,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
Expand Down
17 changes: 16 additions & 1 deletion src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::time::Duration;
use anyhow::anyhow;
use risingwave_common::system_param::common::CommonHandler;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::{check_missing_params, set_system_param};
use risingwave_common::system_param::{
check_missing_params, set_system_param, TIME_TRAVEL_RETENTION_MS_KEY,
};
use risingwave_common::{for_all_params, key_of};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SystemParams;
Expand All @@ -32,6 +34,7 @@ use tracing::info;

use self::model::SystemParamsModel;
use super::NotificationManagerRef;
use crate::hummock::time_travel::require_sql_meta_store_err;
use crate::model::{InMemValTransaction, ValTransaction, VarTransaction};
use crate::storage::{MetaStore, MetaStoreRef, Transaction};
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -66,6 +69,14 @@ impl SystemParamsManager {
));
};

if params
.time_travel_retention_ms
.map(|r| r > 0)
.unwrap_or(false)
{
return Err(require_sql_meta_store_err().into());
}

info!("system parameters: {:?}", params);
check_missing_params(&params).map_err(|e| anyhow!(e))?;

Expand All @@ -86,6 +97,10 @@ impl SystemParamsManager {
}

pub async fn set_param(&self, name: &str, value: Option<String>) -> MetaResult<SystemParams> {
if name == TIME_TRAVEL_RETENTION_MS_KEY {
return Err(require_sql_meta_store_err().into());
}

let mut params_guard = self.params.write().await;
let params = params_guard.deref_mut();
let mut mem_txn = VarTransaction::new(params);
Expand Down

0 comments on commit 892d86f

Please sign in to comment.