Skip to content

Commit

Permalink
refactor(meta): simplify hummock manager val transaction (risingwavel…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 21, 2024
1 parent abb9a56 commit 57d4da2
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 518 deletions.
93 changes: 30 additions & 63 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ use crate::hummock::compaction::compaction_config::{
validate_compaction_config, CompactionConfigBuilder,
};
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, HummockManager};
use crate::hummock::manager::{commit_multi_var, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::manager::{MetaSrvEnv, MetaStoreImpl};
use crate::model::{
BTreeMapEntryTransaction, BTreeMapEntryTransactionWrapper, BTreeMapTransaction,
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, ValTransaction,
BTreeMapEntryTransaction, BTreeMapTransaction, MetadataModel, MetadataModelError,
};
use crate::storage::MetaStore;

Expand Down Expand Up @@ -176,14 +175,10 @@ impl HummockManager {
}
// All NewCompactionGroup pairs are mapped to one new compaction group.
let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
let mut new_version_delta = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapEntryTransactionWrapper,
BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
)
let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
);

for (table_id, raw_group_id) in pairs {
Expand Down Expand Up @@ -254,14 +249,10 @@ impl HummockManager {
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
let mut new_version_delta = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapEntryTransactionWrapper,
BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
)
let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
);
let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
HashMap::new();
Expand Down Expand Up @@ -462,14 +453,10 @@ impl HummockManager {
parent_group_id
)));
}
let mut new_version_delta = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapEntryTransactionWrapper,
BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
)
let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
);
let new_sst_start_id = next_sstable_object_id(
&self.env,
Expand Down Expand Up @@ -528,17 +515,13 @@ impl HummockManager {
let (new_compaction_group_id, config) = new_group;
{
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let insert = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapEntryTransactionWrapper,
BTreeMapEntryTransaction::new_insert(
&mut compaction_group_manager.compaction_groups,
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: Arc::new(config),
},
)
let insert = BTreeMapEntryTransaction::new_insert(
&mut compaction_group_manager.compaction_groups,
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: Arc::new(config),
},
);
commit_multi_var!(self.meta_store_ref(), new_version_delta, insert)?;
// Currently, only splitting out a single table_id is supported.
Expand Down Expand Up @@ -689,11 +672,7 @@ impl CompactionGroupManager {
&mut self,
compaction_group_ids: &[CompactionGroupId],
) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
let mut compaction_groups = create_trx_wrapper!(
self.meta_store_impl,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut self.compaction_groups,)
);
let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups);
for id in compaction_group_ids {
if compaction_groups.contains_key(id) {
continue;
Expand Down Expand Up @@ -727,11 +706,7 @@ impl CompactionGroupManager {
compaction_group_ids: &[CompactionGroupId],
config_to_update: &[MutableConfig],
) -> Result<Vec<CompactionGroup>> {
let mut compaction_groups = create_trx_wrapper!(
self.meta_store_impl,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut self.compaction_groups,)
);
let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups);
let mut result = Vec::with_capacity(compaction_group_ids.len());
for compaction_group_id in compaction_group_ids.iter().unique() {
let group = compaction_groups.get(compaction_group_id).ok_or_else(|| {
Expand All @@ -758,29 +733,21 @@ impl CompactionGroupManager {
group_id: CompactionGroupId,
config: CompactionConfig,
) -> Result<()> {
let insert = create_trx_wrapper!(
self.meta_store_impl,
BTreeMapEntryTransactionWrapper,
BTreeMapEntryTransaction::new_insert(
&mut self.compaction_groups,
let insert = BTreeMapEntryTransaction::new_insert(
&mut self.compaction_groups,
group_id,
CompactionGroup {
group_id,
CompactionGroup {
group_id,
compaction_config: Arc::new(config),
},
)
compaction_config: Arc::new(config),
},
);
commit_multi_var!(self.meta_store_impl, insert)?;
Ok(())
}

/// Removes stale group configs.
async fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) -> Result<()> {
let mut compaction_groups = create_trx_wrapper!(
self.meta_store_impl,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut self.compaction_groups,)
);
let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups);
let stale_group = compaction_groups
.tree_ref()
.keys()
Expand Down
18 changes: 4 additions & 14 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ use risingwave_hummock_sdk::{
use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, ValidationTask};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{
commit_multi_var, create_trx_wrapper, start_measure_real_process_timer,
};
use crate::hummock::manager::{commit_multi_var, start_measure_real_process_timer};
use crate::hummock::HummockManager;
use crate::manager::{MetaStoreImpl, MetadataManager, META_NODE_ID};
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
use crate::model::BTreeMapTransaction;
use crate::storage::MetaStore;

#[derive(Default)]
Expand All @@ -55,16 +53,8 @@ impl ContextInfo {
anyhow::anyhow!("failpoint internal error")
)));

let mut pinned_versions = create_trx_wrapper!(
meta_store_ref,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut self.pinned_versions,)
);
let mut pinned_snapshots = create_trx_wrapper!(
meta_store_ref,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut self.pinned_snapshots,)
);
let mut pinned_versions = BTreeMapTransaction::new(&mut self.pinned_versions);
let mut pinned_snapshots = BTreeMapTransaction::new(&mut self.pinned_snapshots);
for context_id in context_ids.as_ref() {
pinned_versions.remove(*context_id);
pinned_snapshots.remove(*context_id);
Expand Down
11 changes: 4 additions & 7 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::Event as Respon
use risingwave_pb::hummock::FullScanTask;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper};
use crate::hummock::manager::commit_multi_var;
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
use crate::model::BTreeMapTransaction;
use crate::storage::MetaStore;

#[derive(Default)]
Expand Down Expand Up @@ -105,11 +105,8 @@ impl HummockManager {
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete.len()));
}
let mut hummock_version_deltas = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,)
);
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
let batch = deltas_to_delete
.iter()
.take(batch_size)
Expand Down
Loading

0 comments on commit 57d4da2

Please sign in to comment.