Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): record delta for new compaction group #19253

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 49 additions & 61 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use crate::hummock::metrics_utils::{
};
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
use crate::hummock::{
commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
HummockManager,
commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager,
};

pub enum NewTableFragmentInfo {
Expand Down Expand Up @@ -109,13 +109,6 @@ impl HummockManager {
);
}

let previous_time_travel_toggle_check = versioning.time_travel_toggle_check;
versioning.time_travel_toggle_check = self.time_travel_enabled().await;
if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check {
// Take a snapshot for the first commit epoch after enabling time travel.
versioning.mark_next_time_travel_version_snapshot();
}

let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
Expand Down Expand Up @@ -214,6 +207,10 @@ impl HummockManager {
new_table_watermarks,
change_log_delta,
);
if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
// Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
versioning.time_travel_snapshot_interval_counter = u64::MAX;
}

// Apply stats changes.
let mut version_stats = HummockVersionStatsTransaction::new(
Expand Down Expand Up @@ -247,59 +244,50 @@ impl HummockManager {
);
table_metrics.inc_write_throughput(stats_value as u64);
}
if versioning.time_travel_toggle_check {
let mut time_travel_version = None;
if versioning.time_travel_snapshot_interval_counter
>= self.env.opts.hummock_time_travel_snapshot_interval
{
versioning.time_travel_snapshot_interval_counter = 0;
time_travel_version = Some(version.latest_version());
} else {
versioning.time_travel_snapshot_interval_counter = versioning
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
.filter_map(|(table_id, cg_id)| {
tables_to_commit
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
.await?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager_txn
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}
let mut time_travel_version = None;
if versioning.time_travel_snapshot_interval_counter
>= self.env.opts.hummock_time_travel_snapshot_interval
{
versioning.time_travel_snapshot_interval_counter = 0;
time_travel_version = Some(version.latest_version());
} else {
commit_multi_var!(
self.meta_store_ref(),
version,
version_stats,
compaction_group_manager_txn
)?;
versioning.time_travel_snapshot_interval_counter = versioning
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
.filter_map(|(table_id, cg_id)| {
tables_to_commit
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
.await?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager_txn
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}

for compaction_group_id in &modified_compaction_groups {
Expand Down
26 changes: 16 additions & 10 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::collections::{HashMap, HashSet, VecDeque};
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::time_travel::{
refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId,
};
Expand All @@ -45,14 +44,6 @@ use crate::hummock::HummockManager;

/// Time travel.
impl HummockManager {
pub(crate) async fn time_travel_enabled(&self) -> bool {
self.env
.system_params_reader()
.await
.time_travel_retention_ms()
> 0
}

pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
let sql_store = self.env.meta_store_ref();
let mut guard = self.versioning.write().await;
Expand Down Expand Up @@ -473,6 +464,11 @@ fn replay_archive(
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
debug_assert!(
!should_mark_next_time_travel_version_snapshot(&d),
"unexpected time travel delta {:?}",
d
);
// Need to work around the assertion in `apply_version_delta`.
// Because compaction deltas are not included in time travel archive.
while last_version.id < d.prev_id {
Expand Down Expand Up @@ -506,3 +502,13 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}

/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
delta.group_deltas.iter().any(|(_, deltas)| {
deltas
.group_deltas
.iter()
.any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
})
}
2 changes: 0 additions & 2 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ pub struct Versioning {
pub time_travel_snapshot_interval_counter: u64,
/// Used to avoid the attempts to rewrite the same SST to meta store
pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
/// Whether time travel is enabled during last commit epoch.
pub time_travel_toggle_check: bool,

// Persistent states below
pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
Expand Down
Loading