From a6d7d10b1fce2b6145d072bf39ab205db39ce8de Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 18 Oct 2023 14:38:05 +0800 Subject: [PATCH] feat(ctl): support rebuild table stats with approximate value --- proto/hummock.proto | 5 + src/ctl/src/cmd_impl/hummock/list_version.rs | 6 + src/ctl/src/lib.rs | 5 + src/meta/service/src/hummock_service.rs | 8 + src/meta/src/hummock/manager/versioning.rs | 156 ++++++++++++++++++- src/rpc_client/src/meta_client.rs | 7 + 6 files changed, 183 insertions(+), 4 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 6957d7e060524..1b557ca5fb31b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -681,6 +681,10 @@ message ListHummockMetaConfigResponse { map configs = 1; } +message RiseCtlRebuildTableStatsRequest {} + +message RiseCtlRebuildTableStatsResponse {} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -706,6 +710,7 @@ service HummockManagerService { rpc RiseCtlPauseVersionCheckpoint(RiseCtlPauseVersionCheckpointRequest) returns (RiseCtlPauseVersionCheckpointResponse); rpc RiseCtlResumeVersionCheckpoint(RiseCtlResumeVersionCheckpointRequest) returns (RiseCtlResumeVersionCheckpointResponse); rpc RiseCtlGetCheckpointVersion(RiseCtlGetCheckpointVersionRequest) returns (RiseCtlGetCheckpointVersionResponse); + rpc RiseCtlRebuildTableStats(RiseCtlRebuildTableStatsRequest) returns (RiseCtlRebuildTableStatsResponse); rpc InitMetadataForReplay(InitMetadataForReplayRequest) returns (InitMetadataForReplayResponse); rpc PinVersion(PinVersionRequest) returns (PinVersionResponse); rpc SplitCompactionGroup(SplitCompactionGroupRequest) returns (SplitCompactionGroupResponse); diff --git a/src/ctl/src/cmd_impl/hummock/list_version.rs b/src/ctl/src/cmd_impl/hummock/list_version.rs index 6935dcf604142..3973860d9e30e 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version.rs @@ -148,3 +148,9 @@ pub async fn list_pinned_snapshots(context: &CtlContext) -> anyhow::Result<()> { } Ok(()) } + +pub async fn rebuild_table_stats(context: &CtlContext) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + meta_client.risectl_rebuild_table_stats().await?; + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 4908e8cdb952e..cf194884d52f3 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -246,6 +246,8 @@ enum HummockCommands { }, /// Validate the current HummockVersion. ValidateVersion, + /// Rebuild table stats + RebuildTableStats, } #[derive(Subcommand)] @@ -608,6 +610,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::ValidateVersion) => { cmd_impl::hummock::validate_version(context).await?; } + Commands::Hummock(HummockCommands::RebuildTableStats) => { + cmd_impl::hummock::rebuild_table_stats(context).await?; + } Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { cmd_impl::table::scan(context, mv_name, data_dir).await? } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 74dc37b82d21e..56bb78f19249f 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -594,6 +594,14 @@ impl HummockManagerService for HummockServiceImpl { ); Ok(Response::new(ListHummockMetaConfigResponse { configs })) } + + async fn rise_ctl_rebuild_table_stats( + &self, + _request: Request, + ) -> Result, Status> { + self.hummock_manager.rebuild_table_stats().await?; + Ok(Response::new(RiseCtlRebuildTableStatsResponse {})) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 1e939513bbf3d..e1ed8a5d716c2 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -23,6 +23,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ HummockVersionExt, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; +use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, }; @@ -30,15 +31,18 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersion, - HummockVersionCheckpoint, HummockVersionDelta, HummockVersionStats, + HummockVersionCheckpoint, HummockVersionDelta, HummockVersionStats, SstableInfo, TableStats, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use crate::hummock::error::Result; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; -use crate::hummock::manager::{read_lock, write_lock}; +use crate::hummock::manager::{commit_multi_var, read_lock, write_lock}; use crate::hummock::metrics_utils::{trigger_safepoint_stat, trigger_write_stop_stats}; use crate::hummock::model::CompactionGroup; use crate::hummock::HummockManager; +use crate::model::{ValTransaction, VarTransaction}; +use crate::storage::Transaction; /// `HummockVersionSafePoint` prevents hummock versions GE than it from being GC. /// It's used by meta node itself to temporarily pin versions. @@ -277,6 +281,16 @@ impl HummockManager { let guard = read_lock!(self, versioning).await; guard.branched_ssts.clone() } + + #[named] + pub async fn rebuild_table_stats(&self) -> Result<()> { + let mut versioning = write_lock!(self, versioning).await; + let new_stats = rebuild_table_stats(&versioning.current_version); + let mut version_stats = VarTransaction::new(&mut versioning.version_stats); + *version_stats = new_stats; + commit_multi_var!(self, None, Transaction::default(), version_stats)?; + Ok(()) + } } /// Calculates write limits for `target_groups`. @@ -338,6 +352,47 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) - init_version } +/// Rebuilds table stats from the given version. +/// Note that the result is approximate value. See `estimate_table_stats`. +fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats { + let mut stats = HummockVersionStats { + hummock_version_id: version.id, + table_stats: Default::default(), + }; + for level in version.get_combined_levels() { + for sst in &level.table_infos { + let changes = estimate_table_stats(sst); + add_prost_table_stats_map(&mut stats.table_stats, &changes); + } + } + stats +} + +/// Estimates table stats change from the given file. +/// - The file stats is evenly distributed among multiple tables within the file. +/// - The total key size and total value size are estimated based on key range and file size. +/// - Branched files may lead to an overestimation. +fn estimate_table_stats(sst: &SstableInfo) -> HashMap { + let mut changes: HashMap = HashMap::default(); + let weighted_value = + |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 }; + let key_range = sst.key_range.as_ref().unwrap(); + let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2; + let mut estimated_total_key_size = estimated_key_size * sst.total_key_count; + if estimated_total_key_size > sst.uncompressed_file_size { + estimated_total_key_size = sst.uncompressed_file_size / 2; + tracing::warn!(sst.sst_id, "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.", estimated_total_key_size, sst.uncompressed_file_size); + } + let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size; + for table_id in &sst.table_ids { + let e = changes.entry(*table_id).or_default(); + e.total_key_count += weighted_value(sst.total_key_count as i64); + e.total_key_size += weighted_value(estimated_total_key_size as i64); + e.total_value_size += weighted_value(estimated_total_value_size as i64); + } + changes +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -346,10 +401,15 @@ mod tests { use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId}; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::write_limits::WriteLimit; - use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersion, Level, OverlappingLevel}; + use risingwave_pb::hummock::{ + HummockPinnedVersion, HummockVersion, HummockVersionStats, KeyRange, Level, + OverlappingLevel, SstableInfo, + }; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; - use crate::hummock::manager::versioning::{calc_new_write_limits, Versioning}; + use crate::hummock::manager::versioning::{ + calc_new_write_limits, estimate_table_stats, rebuild_table_stats, Versioning, + }; use crate::hummock::model::CompactionGroup; #[test] @@ -470,4 +530,92 @@ mod tests { "too many L0 sub levels: 11 > 5" ); } + + #[test] + fn test_estimate_table_stats() { + let sst = SstableInfo { + key_range: Some(KeyRange { + left: vec![1; 10], + right: vec![1; 20], + ..Default::default() + }), + table_ids: vec![1, 2, 3], + total_key_count: 6000, + uncompressed_file_size: 6_000_000, + ..Default::default() + }; + let changes = estimate_table_stats(&sst); + assert_eq!(changes.len(), 3); + for stats in changes.values() { + assert_eq!(stats.total_key_count, 6000 / 3); + assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3); + assert_eq!( + stats.total_value_size, + (6_000_000 - (10 + 20) / 2 * 6000) / 3 + ); + } + + let mut version = HummockVersion { + id: 123, + levels: Default::default(), + max_committed_epoch: 0, + safe_epoch: 0, + }; + for cg in 1..3 { + version.levels.insert( + cg, + Levels { + levels: vec![Level { + table_infos: vec![sst.clone()], + ..Default::default() + }], + l0: Some(Default::default()), + ..Default::default() + }, + ); + } + let HummockVersionStats { + hummock_version_id, + table_stats, + } = rebuild_table_stats(&version); + assert_eq!(hummock_version_id, version.id); + assert_eq!(table_stats.len(), 3); + for (tid, stats) in table_stats { + assert_eq!( + stats.total_key_count, + changes.get(&tid).unwrap().total_key_count * 2 + ); + assert_eq!( + stats.total_key_size, + changes.get(&tid).unwrap().total_key_size * 2 + ); + assert_eq!( + stats.total_value_size, + changes.get(&tid).unwrap().total_value_size * 2 + ); + } + } + + #[test] + fn test_estimate_table_stats_large_key_range() { + let sst = SstableInfo { + key_range: Some(KeyRange { + left: vec![1; 1000], + right: vec![1; 2000], + ..Default::default() + }), + table_ids: vec![1, 2, 3], + total_key_count: 6000, + uncompressed_file_size: 60_000, + ..Default::default() + }; + let changes = estimate_table_stats(&sst); + assert_eq!(changes.len(), 3); + for t in &sst.table_ids { + let stats = changes.get(t).unwrap(); + assert_eq!(stats.total_key_count, 6000 / 3); + assert_eq!(stats.total_key_size, 60_000 / 2 / 3); + assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3); + } + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 74a80f8e9f3e6..73664b6540a45 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1045,6 +1045,12 @@ impl MetaClient { )) } + pub async fn risectl_rebuild_table_stats(&self) -> Result<()> { + let req = RiseCtlRebuildTableStatsRequest {}; + let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?; + Ok(()) + } + pub async fn list_branched_object(&self) -> Result> { let req = ListBranchedObjectRequest {}; let resp = self.inner.list_branched_object(req).await?; @@ -1728,6 +1734,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse } ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse } ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse } + ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse } ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest, Streaming } ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse } ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }