Skip to content

Commit

Permalink
fix(meta): record delta for new compaction group (#19253) (#19256)
Browse files Browse the repository at this point in the history
Co-authored-by: zwang28 <[email protected]>
  • Loading branch information
github-actions[bot] and zwang28 authored Nov 5, 2024
1 parent 35317f1 commit 4809af2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 73 deletions.
110 changes: 49 additions & 61 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use crate::hummock::metrics_utils::{
};
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
use crate::hummock::{
commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
HummockManager,
commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager,
};

pub enum NewTableFragmentInfo {
Expand Down Expand Up @@ -108,13 +108,6 @@ impl HummockManager {
);
}

let previous_time_travel_toggle_check = versioning.time_travel_toggle_check;
versioning.time_travel_toggle_check = self.time_travel_enabled().await;
if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check {
// Take a snapshot for the first commit epoch after enabling time travel.
versioning.mark_next_time_travel_version_snapshot();
}

let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
Expand Down Expand Up @@ -213,6 +206,10 @@ impl HummockManager {
new_table_watermarks,
change_log_delta,
);
if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
// Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
versioning.time_travel_snapshot_interval_counter = u64::MAX;
}

// Apply stats changes.
let mut version_stats = HummockVersionStatsTransaction::new(
Expand Down Expand Up @@ -246,59 +243,50 @@ impl HummockManager {
);
table_metrics.inc_write_throughput(stats_value as u64);
}
if versioning.time_travel_toggle_check {
let mut time_travel_version = None;
if versioning.time_travel_snapshot_interval_counter
>= self.env.opts.hummock_time_travel_snapshot_interval
{
versioning.time_travel_snapshot_interval_counter = 0;
time_travel_version = Some(version.latest_version());
} else {
versioning.time_travel_snapshot_interval_counter = versioning
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
.filter_map(|(table_id, cg_id)| {
tables_to_commit
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
.await?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager_txn
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}
let mut time_travel_version = None;
if versioning.time_travel_snapshot_interval_counter
>= self.env.opts.hummock_time_travel_snapshot_interval
{
versioning.time_travel_snapshot_interval_counter = 0;
time_travel_version = Some(version.latest_version());
} else {
commit_multi_var!(
self.meta_store_ref(),
version,
version_stats,
compaction_group_manager_txn
)?;
versioning.time_travel_snapshot_interval_counter = versioning
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
.filter_map(|(table_id, cg_id)| {
tables_to_commit
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
.await?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager_txn
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}

for compaction_group_id in &modified_compaction_groups {
Expand Down
26 changes: 16 additions & 10 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::collections::{HashMap, HashSet, VecDeque};
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::time_travel::{
refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId,
};
Expand All @@ -46,14 +45,6 @@ use crate::hummock::HummockManager;

/// Time travel.
impl HummockManager {
pub(crate) async fn time_travel_enabled(&self) -> bool {
self.env
.system_params_reader()
.await
.time_travel_retention_ms()
> 0
}

pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
let sql_store = self.env.meta_store_ref();
let mut guard = self.versioning.write().await;
Expand Down Expand Up @@ -489,6 +480,11 @@ fn replay_archive(
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
debug_assert!(
!should_mark_next_time_travel_version_snapshot(&d),
"unexpected time travel delta {:?}",
d
);
// Need to work around the assertion in `apply_version_delta`.
// Because compaction deltas are not included in time travel archive.
while last_version.id < d.prev_id {
Expand Down Expand Up @@ -522,3 +518,13 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}

/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
delta.group_deltas.iter().any(|(_, deltas)| {
deltas
.group_deltas
.iter()
.any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
})
}
2 changes: 0 additions & 2 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ pub struct Versioning {
pub time_travel_snapshot_interval_counter: u64,
/// Used to avoid the attempts to rewrite the same SST to meta store
pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
/// Whether time travel is enabled during last commit epoch.
pub time_travel_toggle_check: bool,

// Persistent states below
pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
Expand Down

0 comments on commit 4809af2

Please sign in to comment.