Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): limit use of hummock version safe epoch #17161

Merged
merged 6 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.map(|cg| RwHummockVersion {
version_id: version.id as _,
max_committed_epoch: version.max_committed_epoch as _,
safe_epoch: version.safe_epoch as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg).into(),
})
.collect()
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,8 @@ mod tests {
let meta_store = MemStore::new();

let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
let hummock_version = HummockVersion {
id: 1,
..Default::default()
};
let mut hummock_version = HummockVersion::default();
hummock_version.id = 1;
let get_ckpt_builder = |v: &HummockVersion| {
let v_ = v.clone();
async move { v_ }
Expand Down
14 changes: 8 additions & 6 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ mod tests {
let snapshot = MetaSnapshot {
id: opts.meta_snapshot_id,
metadata: ClusterMetadata {
hummock_version: HummockVersion {
id: 123,
..Default::default()
hummock_version: {
let mut version = HummockVersion::default();
version.id = 123;
version
},
system_param: system_param.clone(),
..Default::default()
Expand Down Expand Up @@ -448,9 +449,10 @@ mod tests {
memcomparable::to_vec(&"some_value_2".to_string()).unwrap(),
),
]),
hummock_version: HummockVersion {
id: 123,
..Default::default()
hummock_version: {
let mut version = HummockVersion::default();
version.id = 123;
version
},
system_param: system_param.clone(),
..Default::default()
Expand Down
26 changes: 14 additions & 12 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,23 @@ impl HummockManager {

// update state table info
new_version_delta.with_latest_version(|version, delta| {
for table_id in new_table_ids
.into_iter()
.flat_map(|ids| ids.into_iter().map(|table_id| table_id.table_id))
.chain(
version
.levels
.values()
.flat_map(|group| group.member_table_ids.iter().cloned()),
)
{
if let Some(new_table_ids) = new_table_ids {
for table_id in new_table_ids {
delta.state_table_info_delta.insert(
table_id,
StateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: epoch,
},
);
}
}
for (table_id, info) in version.state_table_info.info() {
delta.state_table_info_delta.insert(
TableId::new(table_id),
*table_id,
StateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: version.safe_epoch,
safe_epoch: info.safe_epoch,
},
);
}
Expand Down
43 changes: 26 additions & 17 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use std::time::{Instant, SystemTime};
Expand Down Expand Up @@ -182,25 +183,33 @@ impl<'a> HummockVersionTransaction<'a> {
};
group_deltas.push(group_delta);
version_delta.safe_epoch = std::cmp::max(
version_delta.latest_version().safe_epoch,
version_delta.latest_version().visible_table_safe_epoch(),
compact_task.watermark,
);
if version_delta.latest_version().safe_epoch < version_delta.safe_epoch {
version_delta.state_table_info_delta = version_delta
.latest_version()
.state_table_info
.info()
.iter()
.map(|(table_id, info)| {
(
*table_id,
StateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: version_delta.safe_epoch,
},
)
})
.collect();
if version_delta.latest_version().visible_table_safe_epoch() < version_delta.safe_epoch {
version_delta.with_latest_version(|version, version_delta| {
for (table_id, info) in version.state_table_info.info() {
let new_safe_epoch = min(version_delta.safe_epoch, info.committed_epoch);
if new_safe_epoch > info.safe_epoch {
if new_safe_epoch != version_delta.safe_epoch {
warn!(
new_safe_epoch,
committed_epoch = info.committed_epoch,
global_safe_epoch = version_delta.safe_epoch,
table_id = table_id.table_id,
"table has different safe epoch to global"
);
}
version_delta.state_table_info_delta.insert(
*table_id,
StateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: new_safe_epoch,
},
);
}
}
});
}
version_delta.pre_apply();
}
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,7 @@ impl HummockManager {
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let (committed_epoch, safe_epoch) = {
let version = new_version_delta.latest_version();
(version.max_committed_epoch, version.safe_epoch)
};
let epoch = new_version_delta.latest_version().max_committed_epoch;

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -247,8 +244,8 @@ impl HummockManager {
.insert(
TableId::new(*table_id),
StateTableInfoDelta {
committed_epoch,
safe_epoch,
committed_epoch: epoch,
safe_epoch: epoch,
}
)
.is_none());
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl HummockManager {
.read()
.await
.default_compaction_config();
let checkpoint_version = create_init_version(default_compaction_config);
let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
tracing::info!("init hummock version checkpoint");
versioning_guard.checkpoint = HummockVersionCheckpoint {
version: checkpoint_version.clone(),
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::BTreeMap;
use std::ops::{Deref, DerefMut};

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::build_version_delta_after_version;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::HummockVersionStats;
Expand All @@ -38,7 +37,9 @@ fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion)
metrics
.version_size
.set(current_version.estimated_encode_len() as i64);
metrics.safe_epoch.set(current_version.safe_epoch as i64);
metrics
.safe_epoch
.set(current_version.visible_table_safe_epoch() as i64);
metrics.current_version_id.set(current_version.id as i64);
}

Expand Down Expand Up @@ -86,7 +87,7 @@ impl<'a> HummockVersionTransaction<'a> {
}

pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
let delta = build_version_delta_after_version(self.latest_version());
let delta = self.latest_version().version_delta_after();
SingleDeltaTransaction {
version_txn: self,
delta: Some(delta),
Expand Down
44 changes: 8 additions & 36 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@ use std::collections::{BTreeMap, HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_initial_compaction_group_levels, get_compaction_group_ids,
get_table_compaction_group_id_mapping, BranchedSstInfo,
get_compaction_group_ids, get_table_compaction_group_id_mapping, BranchedSstInfo,
};
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
use risingwave_hummock_sdk::version::{
HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
FIRST_VERSION_ID,
};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot,
HummockVersionStats, SstableInfo, TableStats,
HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, SstableInfo,
TableStats,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

Expand Down Expand Up @@ -349,28 +344,6 @@ pub(super) fn calc_new_write_limits(
new_write_limits
}

pub(super) fn create_init_version(default_compaction_config: CompactionConfig) -> HummockVersion {
let mut init_version = HummockVersion {
id: FIRST_VERSION_ID,
levels: Default::default(),
max_committed_epoch: INVALID_EPOCH,
safe_epoch: INVALID_EPOCH,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
state_table_info: HummockVersionStateTableInfo::empty(),
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
StaticCompactionGroupId::MaterializedView as CompactionGroupId,
] {
init_version.levels.insert(
group_id,
build_initial_compaction_group_levels(group_id, &default_compaction_config),
);
}
init_version
}

/// Rebuilds table stats from the given version.
/// Note that the result is approximate value. See `estimate_table_stats`.
fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
Expand Down Expand Up @@ -575,10 +548,9 @@ mod tests {
);
}

let mut version = HummockVersion {
id: 123,
..Default::default()
};
let mut version = HummockVersion::default();
version.id = 123;

for cg in 1..3 {
version.levels.insert(
cg,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl MetaSnapshotMetadata {
hummock_version_id: v.id,
ssts: v.get_object_ids(),
max_committed_epoch: v.max_committed_epoch,
safe_epoch: v.safe_epoch,
safe_epoch: v.visible_table_safe_epoch(),
format_version,
remarks,
}
Expand Down
38 changes: 26 additions & 12 deletions src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#![feature(lazy_cell)]

use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::{Arc, LazyLock};

use bytes::Bytes;
Expand All @@ -28,8 +28,9 @@ use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::hummock::StateTableInfoDelta;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use spin::Mutex;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -115,17 +116,30 @@ fn gen_version(
new_epoch_idx,
vnode_part_count,
));
// let table_watermarks =
// gen_committed_table_watermarks(old_epoch_idx, new_epoch_idx, vnode_part_count);
HummockVersion {
id: new_epoch_idx as _,
max_committed_epoch: test_epoch(new_epoch_idx as _),
safe_epoch: test_epoch(old_epoch_idx as _),
table_watermarks: (0..table_count)
.map(|table_id| (TableId::new(table_id as _), table_watermarks.clone()))
let mut version = HummockVersion::default();
let committed_epoch = test_epoch(new_epoch_idx as _);
version.id = new_epoch_idx as _;
version.max_committed_epoch = committed_epoch;
version.table_watermarks = (0..table_count)
.map(|table_id| (TableId::new(table_id as _), table_watermarks.clone()))
.collect();
let mut state_table_info = HummockVersionStateTableInfo::empty();
state_table_info.apply_delta(
&(0..table_count)
.map(|table_id| {
(
TableId::new(table_id as _),
StateTableInfoDelta {
committed_epoch,
safe_epoch: test_epoch(old_epoch_idx as _),
},
)
})
.collect(),
..Default::default()
}
&HashSet::new(),
);
version.state_table_info = state_table_info;
version
}

fn bench_table_watermarks(c: &mut Criterion) {
Expand Down
Loading
Loading