Skip to content

Commit

Permalink
refactor(storage): use strong type for HummockVersionId (#17971)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 9, 2024
1 parent 13af20c commit 21a46ab
Show file tree
Hide file tree
Showing 34 changed files with 230 additions and 124 deletions.
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_hummock_sdk::HummockEpoch;
use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};

use crate::CtlContext;

pub async fn list_version_deltas(
context: &CtlContext,
start_id: u64,
start_id: HummockVersionId,
num_epochs: u32,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn get_archive(

pub async fn print_user_key_in_archive(
context: &CtlContext,
archive_ids: Vec<HummockVersionId>,
archive_ids: impl IntoIterator<Item = HummockVersionId>,
data_dir: String,
user_key: String,
use_new_object_prefix_strategy: bool,
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn print_user_key_in_sst(

pub async fn print_version_delta_in_archive(
context: &CtlContext,
archive_ids: Vec<HummockVersionId>,
archive_ids: impl IntoIterator<Item = HummockVersionId>,
data_dir: String,
sst_id: HummockSstableObjectId,
use_new_object_prefix_strategy: bool,
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
version_delta
.into_iter()
.map(|vd| hummock_version_delta::ActiveModel {
id: Set(vd.id as _),
prev_id: Set(vd.prev_id as _),
id: Set(vd.id.to_u64() as _),
prev_id: Set(vd.prev_id.to_u64() as _),
max_committed_epoch: Set(vd.max_committed_epoch as _),
safe_epoch: Set(vd.visible_table_safe_epoch() as _),
trivial_move: Set(vd.trivial_move),
Expand Down
13 changes: 9 additions & 4 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cmd_impl::bench::BenchCommands;
use cmd_impl::hummock::SstDumpArgs;
use itertools::Itertools;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
use risingwave_meta::backup_restore::RestoreOpts;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
Expand Down Expand Up @@ -594,7 +594,12 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
start_id,
num_epochs,
}) => {
cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
cmd_impl::hummock::list_version_deltas(
context,
HummockVersionId::new(start_id),
num_epochs,
)
.await?;
}
Commands::Hummock(HummockCommands::ListKv {
epoch,
Expand Down Expand Up @@ -737,7 +742,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
}) => {
cmd_impl::hummock::print_version_delta_in_archive(
context,
archive_ids,
archive_ids.into_iter().map(HummockVersionId::new),
data_dir,
sst_id,
use_new_object_prefix_strategy,
Expand All @@ -752,7 +757,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
}) => {
cmd_impl::hummock::print_user_key_in_archive(
context,
archive_ids,
archive_ids.into_iter().map(HummockVersionId::new),
data_dir,
user_key,
use_new_object_prefix_strategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.levels
.values()
.map(|cg| RwHummockVersion {
version_id: version.id as _,
version_id: version.id.to_u64() as _,
max_committed_epoch: version.max_committed_epoch as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg.to_protobuf()).into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
let rows = deltas
.into_iter()
.map(|d| RwHummockVersionDelta {
id: d.id as _,
prev_id: d.prev_id as _,
id: d.id.to_u64() as _,
prev_id: d.prev_id.to_u64() as _,
max_committed_epoch: d.max_committed_epoch as _,
safe_epoch: d.visible_table_safe_epoch() as _,
trivial_move: d.trivial_move,
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::Context;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::WorkerNode;
Expand Down Expand Up @@ -281,7 +282,9 @@ impl FrontendMetaClient for FrontendMetaClientImpl {

async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
// FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
self.0.list_version_deltas(0, u32::MAX, u64::MAX).await
self.0
.list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
.await
}

async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
Expand Down
17 changes: 14 additions & 3 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
Expand Down Expand Up @@ -75,7 +76,10 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.unpin_version_before(req.context_id, req.unpin_version_before)
.unpin_version_before(
req.context_id,
HummockVersionId::new(req.unpin_version_before),
)
.await?;
Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
}
Expand Down Expand Up @@ -114,7 +118,10 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.trigger_compaction_deterministic(req.version_id, req.compaction_groups)
.trigger_compaction_deterministic(
HummockVersionId::new(req.version_id),
req.compaction_groups,
)
.await?;
Ok(Response::new(TriggerCompactionDeterministicResponse {}))
}
Expand All @@ -136,7 +143,11 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version_deltas = self
.hummock_manager
.list_version_deltas(req.start_id, req.num_limit, req.committed_epoch_limit)
.list_version_deltas(
HummockVersionId::new(req.start_id),
req.num_limit,
req.committed_epoch_limit,
)
.await?;
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ mod tests {
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_common::system_param::system_params_for_test;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::HummockVersionStats;

use crate::backup_restore::meta_snapshot_builder;
Expand All @@ -190,13 +191,13 @@ mod tests {

let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
let mut hummock_version = HummockVersion::default();
hummock_version.id = 1;
hummock_version.id = HummockVersionId::new(1);
let get_ckpt_builder = |v: &HummockVersion| {
let v_ = v.clone();
async move { v_ }
};
let hummock_version_stats = HummockVersionStats {
hummock_version_id: hummock_version.id,
hummock_version_id: hummock_version.id.to_u64(),
..Default::default()
};
hummock_version_stats.insert(&meta_store).await.unwrap();
Expand Down Expand Up @@ -250,7 +251,7 @@ mod tests {
snapshot.metadata.default_cf.values().cloned().collect_vec(),
vec![vec![100]]
);
assert_eq!(snapshot.metadata.hummock_version.id, 1);
assert_eq!(snapshot.metadata.hummock_version.id.to_u64(), 1);
assert_eq!(snapshot.metadata.version_stats.hummock_version_id, 1);
}
}
5 changes: 3 additions & 2 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ mod tests {
use risingwave_backup::storage::MetaSnapshotStorage;
use risingwave_common::config::{MetaBackend, SystemConfig};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::SystemParams;

Expand Down Expand Up @@ -289,7 +290,7 @@ mod tests {
metadata: ClusterMetadata {
hummock_version: {
let mut version = HummockVersion::default();
version.id = 123;
version.id = HummockVersionId::new(123);
version
},
system_param: system_param.clone(),
Expand Down Expand Up @@ -472,7 +473,7 @@ mod tests {
]),
hummock_version: {
let mut version = HummockVersion::default();
version.id = 123;
version.id = HummockVersionId::new(123);
version
},
system_param: system_param.clone(),
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ impl HummockVersionCheckpoint {
stale_objects: checkpoint
.stale_objects
.iter()
.map(|(version_id, objects)| (*version_id as HummockVersionId, objects.clone()))
.map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
.collect(),
}
}

pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
PbHummockVersionCheckpoint {
version: Some(PbHummockVersion::from(&self.version)),
stale_objects: self.stale_objects.clone(),
stale_objects: self
.stale_objects
.iter()
.map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
.collect(),
}
}
}
Expand Down Expand Up @@ -245,7 +249,7 @@ impl HummockManager {
timer.observe_duration();
self.metrics
.checkpoint_version_id
.set(new_checkpoint_id as i64);
.set(new_checkpoint_id.to_u64() as i64);

Ok(new_checkpoint_id - old_checkpoint_id)
}
Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,15 @@ impl HummockManager {
context_id,
HummockPinnedVersion {
context_id,
min_pinned_id: INVALID_VERSION_ID,
min_pinned_id: INVALID_VERSION_ID.to_u64(),
},
);
let version_id = versioning.current_version.id;
let ret = versioning.current_version.clone();
if context_pinned_version.min_pinned_id == INVALID_VERSION_ID
|| context_pinned_version.min_pinned_id > version_id
if HummockVersionId::new(context_pinned_version.min_pinned_id) == INVALID_VERSION_ID
|| HummockVersionId::new(context_pinned_version.min_pinned_id) > version_id
{
context_pinned_version.min_pinned_id = version_id;
context_pinned_version.min_pinned_id = version_id.to_u64();
commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);
}
Expand Down Expand Up @@ -327,12 +327,12 @@ impl HummockManager {
},
);
assert!(
context_pinned_version.min_pinned_id <= unpin_before,
context_pinned_version.min_pinned_id <= unpin_before.to_u64(),
"val must be monotonically non-decreasing. old = {}, new = {}.",
context_pinned_version.min_pinned_id,
unpin_before
);
context_pinned_version.min_pinned_id = unpin_before;
context_pinned_version.min_pinned_id = unpin_before.to_u64();
commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);

Expand Down Expand Up @@ -492,10 +492,10 @@ impl HummockManager {

fn trigger_safepoint_stat(metrics: &MetaMetrics, safepoints: &[HummockVersionId]) {
if let Some(sp) = safepoints.iter().min() {
metrics.min_safepoint_version_id.set(*sp as _);
metrics.min_safepoint_version_id.set(sp.to_u64() as _);
} else {
metrics
.min_safepoint_version_id
.set(HummockVersionId::MAX as _);
.set(HummockVersionId::MAX.to_u64() as _);
}
}
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl HummockManager {
.into_iter()
.map(|m| {
(
m.id as HummockVersionId,
HummockVersionId::new(m.id as _),
HummockVersionDelta::from_persisted_protobuf(
&PbHummockVersionDelta::from(m),
),
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ async fn test_hummock_manager_basic() {
);
for _ in 0..2 {
hummock_manager
.unpin_version_before(context_id_1, u64::MAX)
.unpin_version_before(context_id_1, HummockVersionId::MAX)
.await
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -571,8 +571,8 @@ async fn test_hummock_manager_basic() {
);
// pinned by context_id_1
assert_eq!(
hummock_manager.get_min_pinned_version_id().await,
init_version_id + commit_log_count + register_log_count - 2,
hummock_manager.get_min_pinned_version_id().await + 2,
init_version_id + commit_log_count + register_log_count,
);
}
// objects_to_delete is always empty because no compaction is ever invoked.
Expand All @@ -597,15 +597,15 @@ async fn test_hummock_manager_basic() {
((commit_log_count + register_log_count) as usize, 0)
);
hummock_manager
.unpin_version_before(context_id_1, u64::MAX)
.unpin_version_before(context_id_1, HummockVersionId::MAX)
.await
.unwrap();
assert_eq!(
hummock_manager.get_min_pinned_version_id().await,
init_version_id + commit_log_count + register_log_count
);
hummock_manager
.unpin_version_before(context_id_2, u64::MAX)
.unpin_version_before(context_id_2, HummockVersionId::MAX)
.await
.unwrap();
assert_eq!(
Expand Down
Loading

0 comments on commit 21a46ab

Please sign in to comment.