Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): hide usage on global max committed epoch #17973

Merged
merged 21 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
afdd10a
temp save
wenym1 Aug 8, 2024
b73777b
refactor(storage): use strong type for HummockVersionId
wenym1 Aug 8, 2024
2c90013
feat: call clear shared buffer with latest version id
wenym1 Aug 8, 2024
585b86d
Merge branch 'yiming/strong-type-version-id' into yiming/hide-max-com…
wenym1 Aug 8, 2024
eec36bb
Merge branch 'yiming/clear-with-version-id' into yiming/hide-max-comm…
wenym1 Aug 8, 2024
2a22eff
refactor(storage): hide usage on global max committed epoch
wenym1 Aug 8, 2024
b6b4c97
Merge branch 'main' into yiming/hide-max-committed-epoch
wenym1 Aug 9, 2024
f7c087c
Merge branch 'main' into yiming/strong-type-version-id
wenym1 Aug 9, 2024
5af2b7b
Merge branch 'yiming/strong-type-version-id' into yiming/clear-with-v…
wenym1 Aug 9, 2024
0877926
avoid clone hummock version
wenym1 Aug 9, 2024
0f0af0a
Merge branch 'yiming/clear-with-version-id' into yiming/hide-max-comm…
wenym1 Aug 9, 2024
54778ab
get version id in each retry
wenym1 Aug 10, 2024
719c15e
Merge branch 'yiming/clear-with-version-id' into yiming/hide-max-comm…
wenym1 Aug 10, 2024
41f25be
suppress switch op log
wenym1 Aug 12, 2024
36d4d8d
Merge branch 'main' into yiming/clear-with-version-id
wenym1 Aug 12, 2024
08415b8
allow more err
wenym1 Aug 13, 2024
b2c8cde
add more error
wenym1 Aug 13, 2024
249ffb9
more retry
wenym1 Aug 13, 2024
47330c0
add log and reduce retry timeout
wenym1 Aug 13, 2024
1d1d88c
Merge branch 'main' into yiming/clear-with-version-id
wenym1 Aug 13, 2024
10bdf3d
Merge branch 'yiming/clear-with-version-id' into yiming/hide-max-comm…
wenym1 Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ pub async fn list_version(
} else {
println!(
"Version {} max_committed_epoch {}",
version.id, version.max_committed_epoch
version.id,
version.visible_table_committed_epoch()
);

for (cg, levels) in &version.levels {
Expand Down
3 changes: 2 additions & 1 deletion src/ctl/src/cmd_impl/hummock/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
println!(
"Disabled.\
Current version: id {}, max_committed_epoch {}",
version.id, version.max_committed_epoch
version.id,
version.visible_table_committed_epoch()
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.map(|vd| hummock_version_delta::ActiveModel {
id: Set(vd.id.to_u64() as _),
prev_id: Set(vd.prev_id.to_u64() as _),
max_committed_epoch: Set(vd.max_committed_epoch as _),
max_committed_epoch: Set(vd.visible_table_committed_epoch() as _),
safe_epoch: Set(vd.visible_table_safe_epoch() as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set((&vd.to_protobuf()).into()),
Expand Down
5 changes: 4 additions & 1 deletion src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorag
print_table_catalog(&table);

println!("Rows:");
let read_epoch = hummock.inner().get_pinned_version().max_committed_epoch();
let read_epoch = hummock
.inner()
.get_pinned_version()
.visible_table_committed_epoch();
let storage_table = make_storage_table(hummock, &table)?;
let stream = storage_table
.batch_iter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.values()
.map(|cg| RwHummockVersion {
version_id: version.id.to_u64() as _,
max_committed_epoch: version.max_committed_epoch as _,
max_committed_epoch: version.visible_table_committed_epoch() as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg.to_protobuf()).into(),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
.map(|d| RwHummockVersionDelta {
id: d.id.to_u64() as _,
prev_id: d.prev_id.to_u64() as _,
max_committed_epoch: d.max_committed_epoch as _,
max_committed_epoch: d.visible_table_committed_epoch() as _,
safe_epoch: d.visible_table_safe_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d
Expand Down
10 changes: 9 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,16 @@ impl GlobalBarrierManager {
.context
.hummock_manager
.on_current_version(|version| {
let max_committed_epoch = version.visible_table_committed_epoch();
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, max_committed_epoch,
"table {} with invisible epoch is not purged",
table_id
);
}
(
TracedEpoch::new(Epoch::from(version.max_committed_epoch)),
TracedEpoch::new(Epoch::from(max_committed_epoch)),
version.id,
)
})
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl HummockManager {
let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");

let start_time = Instant::now();
let max_committed_epoch = versioning.current_version.max_committed_epoch;
let max_committed_epoch = versioning.current_version.visible_table_committed_epoch();
let watermark = self
.context_info
.read()
Expand Down Expand Up @@ -1379,7 +1379,7 @@ impl HummockManager {
tracing::info!(
"Trigger compaction for version {}, epoch {}, groups {:?}",
old_version.id,
old_version.max_committed_epoch,
old_version.visible_table_committed_epoch(),
compaction_groups
);
})
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ impl HummockManager {
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let epoch = new_version_delta.latest_version().max_committed_epoch;
let epoch = new_version_delta
.latest_version()
.visible_table_committed_epoch();

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down
13 changes: 7 additions & 6 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl HummockManager {

pub async fn commit_epoch_sanity_check(
&self,
epoch: HummockEpoch,
max_committed_epoch: HummockEpoch,
sstables: &[LocalSstableInfo],
sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
current_version: &HummockVersion,
Expand All @@ -221,11 +221,12 @@ impl HummockManager {
}
}

if epoch <= current_version.max_committed_epoch {
// TODO: allow equal when supporting partial checkpoint
if max_committed_epoch <= current_version.visible_table_committed_epoch() {
return Err(anyhow::anyhow!(
"Epoch {} <= max_committed_epoch {}",
epoch,
current_version.max_committed_epoch
max_committed_epoch,
current_version.visible_table_committed_epoch()
)
.into());
}
Expand All @@ -252,7 +253,7 @@ impl HummockManager {
.send_event(ResponseEvent::ValidationTask(ValidationTask {
sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(),
sst_id_to_worker_id: sst_to_context.clone(),
epoch,
epoch: max_committed_epoch,
}))
.is_err()
{
Expand Down Expand Up @@ -427,7 +428,7 @@ impl HummockManager {
let _timer = start_measure_real_process_timer!(self, "unpin_snapshot_before");
// Use the max_committed_epoch in storage as the snapshot ts so only committed changes are
// visible in the snapshot.
let max_committed_epoch = versioning.current_version.max_committed_epoch;
let max_committed_epoch = versioning.current_version.visible_table_committed_epoch();
// Ensure the unpin will not clean the latest one.
let snapshot_committed_epoch = hummock_snapshot.committed_epoch;
#[cfg(not(test))]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ impl HummockManager {

self.latest_snapshot.store(
HummockSnapshot {
committed_epoch: redo_state.max_committed_epoch,
current_epoch: redo_state.max_committed_epoch,
committed_epoch: redo_state.visible_table_committed_epoch(),
current_epoch: redo_state.visible_table_committed_epoch(),
}
.into(),
);
Expand Down
15 changes: 9 additions & 6 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ async fn test_hummock_transaction() {
.await;
// Get tables before committing epoch1. No tables should be returned.
let current_version = hummock_manager.get_current_version().await;
assert_eq!(current_version.max_committed_epoch, INVALID_EPOCH);
assert_eq!(
current_version.visible_table_committed_epoch(),
INVALID_EPOCH
);
assert!(get_sorted_committed_object_ids(&current_version).is_empty());

// Commit epoch1
Expand All @@ -333,7 +336,7 @@ async fn test_hummock_transaction() {

// Get tables after committing epoch1. All tables committed in epoch1 should be returned
let current_version = hummock_manager.get_current_version().await;
assert_eq!(current_version.max_committed_epoch, epoch1);
assert_eq!(current_version.visible_table_committed_epoch(), epoch1);
assert_eq!(
get_sorted_object_ids(&committed_tables),
get_sorted_committed_object_ids(&current_version)
Expand All @@ -356,7 +359,7 @@ async fn test_hummock_transaction() {
// Get tables before committing epoch2. tables_in_epoch1 should be returned and
// tables_in_epoch2 should be invisible.
let current_version = hummock_manager.get_current_version().await;
assert_eq!(current_version.max_committed_epoch, epoch1);
assert_eq!(current_version.visible_table_committed_epoch(), epoch1);
assert_eq!(
get_sorted_object_ids(&committed_tables),
get_sorted_committed_object_ids(&current_version)
Expand All @@ -375,7 +378,7 @@ async fn test_hummock_transaction() {
// Get tables after committing epoch2. tables_in_epoch1 and tables_in_epoch2 should be
// returned
let current_version = hummock_manager.get_current_version().await;
assert_eq!(current_version.max_committed_epoch, epoch2);
assert_eq!(current_version.visible_table_committed_epoch(), epoch2);
assert_eq!(
get_sorted_object_ids(&committed_tables),
get_sorted_committed_object_ids(&current_version)
Expand Down Expand Up @@ -1148,7 +1151,7 @@ async fn test_extend_objects_to_delete() {
);
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize);
let new_epoch = pinned_version2.max_committed_epoch.next_epoch();
let new_epoch = pinned_version2.visible_table_committed_epoch().next_epoch();
hummock_manager
.commit_epoch_for_test(
new_epoch,
Expand All @@ -1158,7 +1161,7 @@ async fn test_extend_objects_to_delete() {
.await
.unwrap();
let pinned_version3: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap();
assert_eq!(new_epoch, pinned_version3.max_committed_epoch);
assert_eq!(new_epoch, pinned_version3.visible_table_committed_epoch());
hummock_manager
.unpin_version_before(context_id, pinned_version3.id)
.await
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl HummockManager {
Ok(count)
}

let epoch = delta.max_committed_epoch;
let epoch = delta.visible_table_committed_epoch();
let version_id: u64 = delta.id.to_u64();
let m = hummock_epoch_to_version::ActiveModel {
epoch: Set(epoch.try_into().unwrap()),
Expand Down Expand Up @@ -483,14 +483,14 @@ fn replay_archive(
deltas: impl Iterator<Item = PbHummockVersionDelta>,
) -> HummockVersion {
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
let mut mce = last_version.max_committed_epoch;
let mut mce = last_version.visible_table_committed_epoch();
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
assert!(
d.max_committed_epoch > mce,
d.visible_table_committed_epoch() > mce,
"time travel expects delta from commit_epoch only"
);
mce = d.max_committed_epoch;
mce = d.visible_table_committed_epoch();
// Need to work around the assertion in `apply_version_delta`.
// Because compaction deltas are not included in time travel archive.
while last_version.id < d.prev_id {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
metrics
.max_committed_epoch
.set(current_version.max_committed_epoch as i64);
.set(current_version.visible_table_committed_epoch() as i64);
metrics
.version_size
.set(current_version.estimated_encode_len() as i64);
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<'a> HummockVersionTransaction<'a> {
)>,
) -> HummockVersionDelta {
let mut new_version_delta = self.new_delta();
new_version_delta.max_committed_epoch = max_committed_epoch;
new_version_delta.set_max_committed_epoch(max_committed_epoch);
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl HummockManager {
.hummock_version_deltas
.range(start_id..)
.map(|(_id, delta)| delta)
.filter(|delta| delta.max_committed_epoch <= committed_epoch_limit)
.filter(|delta| delta.visible_table_committed_epoch() <= committed_epoch_limit)
.take(num_limit as _)
.cloned()
.collect();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/model/ext/hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Transactional<Transaction> for HummockVersionDelta {
let m = hummock_version_delta::ActiveModel {
id: Set(self.id.to_u64().try_into().unwrap()),
prev_id: Set(self.prev_id.to_u64().try_into().unwrap()),
max_committed_epoch: Set(self.max_committed_epoch.try_into().unwrap()),
max_committed_epoch: Set(self.visible_table_committed_epoch().try_into().unwrap()),
safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()),
trivial_move: Set(self.trivial_move),
full_version_delta: Set(FullVersionDelta::from(&self.into())),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fiemap = "0.1.1"
[features]
# rocksdb-local = ["rocksdb"]
# tikv = ["tikv-client"]
test = []
test = ["risingwave_hummock_sdk/test"]
failpoints = ["fail/failpoints"]
bpf = []
hm-trace = []
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl MetaSnapshotMetadata {
id,
hummock_version_id: v.id,
ssts: v.get_object_ids(),
max_committed_epoch: v.max_committed_epoch,
max_committed_epoch: v.visible_table_committed_epoch(),
safe_epoch: v.visible_table_safe_epoch(),
format_version,
remarks,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/backup/src/meta_snapshot_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ impl Display for MetadataV2 {
writeln!(
f,
"Hummock version: id {}, max_committed_epoch: {}",
self.hummock_version.id, self.hummock_version.max_committed_epoch
self.hummock_version.id,
self.hummock_version.visible_table_committed_epoch()
)?;
// optionally dump other metadata
Ok(())
Expand Down
12 changes: 7 additions & 5 deletions src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
use risingwave_pb::hummock::StateTableInfoDelta;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::hummock::{PbHummockVersion, StateTableInfoDelta};
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use spin::Mutex;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -115,10 +115,12 @@ fn gen_version(
new_epoch_idx,
vnode_part_count,
));
let mut version = HummockVersion::default();
let committed_epoch = test_epoch(new_epoch_idx as _);
version.id = HummockVersionId::new(new_epoch_idx as _);
version.max_committed_epoch = committed_epoch;
let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
id: new_epoch_idx as _,
max_committed_epoch: committed_epoch,
..Default::default()
});
version.table_watermarks = (0..table_count)
.map(|table_id| (TableId::new(table_id as _), table_watermarks.clone()))
.collect();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/hummock_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ tracing = "0.1"
[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../../workspace-hack" }

[features]
test = []

[lints]
workspace = true
Loading
Loading