diff --git a/proto/hummock.proto b/proto/hummock.proto index c9e6b96c91aaa..ab212787d7287 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -577,7 +577,10 @@ message ReportFullScanTaskRequest { uint64 total_object_count = 2; // Total size of objects before filtered by conditions specified by FullScanTask. uint64 total_object_size = 3; + // Start key of the next page. optional string next_start_after = 4; + // Start key of this page. + optional string start_after = 5; } message ReportFullScanTaskResponse { diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 1c2dd26e2bff6..3e7c5ed34e611 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -251,14 +251,12 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let req = request.into_inner(); let hummock_manager = self.hummock_manager.clone(); - hummock_manager - .metrics - .total_object_count - .set(req.total_object_count as _); - hummock_manager - .metrics - .total_object_size - .set(req.total_object_size as _); + hummock_manager.update_paged_metrics( + req.start_after, + req.next_start_after.clone(), + req.total_object_count, + req.total_object_size, + ); // The following operation takes some time, so we do it in dedicated task and responds the // RPC immediately. tokio::spawn(async move { diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 31d6f085dfc78..bfa3bd0dbddcb 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -301,6 +301,32 @@ impl HummockManager { .map(|m| m.seq.try_into().unwrap()); Ok(now) } + + pub fn update_paged_metrics( + &self, + start_after: Option, + next_start_after: Option, + total_object_count_in_page: u64, + total_object_size_in_page: u64, + ) { + let mut paged_metrics = self.paged_metrics.lock(); + paged_metrics.total_object_size.update( + start_after.clone(), + next_start_after.clone(), + total_object_size_in_page, + ); + paged_metrics.total_object_count.update( + start_after, + next_start_after, + total_object_count_in_page, + ); + if let Some(total_object_size) = paged_metrics.total_object_size.take() { + self.metrics.total_object_size.set(total_object_size as _); + } + if let Some(total_object_count) = paged_metrics.total_object_count.take() { + self.metrics.total_object_count.set(total_object_count as _); + } + } } pub struct FullGcState { @@ -325,6 +351,84 @@ impl FullGcState { } } +pub struct PagedMetrics { + total_object_count: PagedMetric, + total_object_size: PagedMetric, +} + +impl PagedMetrics { + pub fn new() -> Self { + Self { + total_object_count: PagedMetric::new(), + total_object_size: PagedMetric::new(), + } + } +} + +/// The metrics should be accumulated on a per-page basis and then finalized at the end. +pub struct PagedMetric { + /// identifier of a page + expect_start_key: Option, + /// accumulated metric value of pages seen so far + running_value: u64, + /// final metric value + sealed_value: Option, +} + +impl PagedMetric { + fn new() -> Self { + Self { + expect_start_key: None, + running_value: 0, + sealed_value: None, + } + } + + fn update( + &mut self, + current_start_key: Option, + next_start_key: Option, + value: u64, + ) { + // Encounter an update without pagination, replace current state. + if current_start_key.is_none() && next_start_key.is_none() { + self.running_value = value; + self.seal(); + return; + } + // Encounter an update from unexpected page, reset current state. + if current_start_key != self.expect_start_key { + self.reset(); + return; + } + self.running_value += value; + // There are more pages to add. + if next_start_key.is_some() { + self.expect_start_key = next_start_key; + return; + } + // This is the last page, seal the metric value. + self.seal(); + } + + fn seal(&mut self) { + self.sealed_value = Some(self.running_value); + self.reset(); + } + + fn reset(&mut self) { + self.running_value = 0; + self.expect_start_key = None; + } + + fn take(&mut self) -> Option { + if self.sealed_value.is_some() { + self.reset(); + } + self.sealed_value.take() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -334,7 +438,7 @@ mod tests { use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_rpc_client::HummockMetaClient; - use super::ResponseEvent; + use super::{PagedMetric, ResponseEvent}; use crate::hummock::test_utils::{add_test_tables, setup_compute_env}; use crate::hummock::MockHummockMetaClient; @@ -428,4 +532,35 @@ mod tests { .unwrap() ); } + + #[test] + fn test_paged_metric() { + let mut metric = PagedMetric::new(); + fn assert_empty_state(metric: &mut PagedMetric) { + assert_eq!(metric.running_value, 0); + assert!(metric.expect_start_key.is_none()); + } + assert!(metric.sealed_value.is_none()); + assert_empty_state(&mut metric); + + metric.update(None, None, 100); + assert_eq!(metric.take().unwrap(), 100); + assert!(metric.take().is_none()); + assert_empty_state(&mut metric); + + // "start" is not a legal identifier for the first page + metric.update(Some("start".into()), Some("end".into()), 100); + assert!(metric.take().is_none()); + assert_empty_state(&mut metric); + + metric.update(None, Some("middle".into()), 100); + assert!(metric.take().is_none()); + assert_eq!(metric.running_value, 100); + assert_eq!(metric.expect_start_key, Some("middle".into())); + + metric.update(Some("middle".into()), None, 50); + assert_eq!(metric.take().unwrap(), 150); + assert!(metric.take().is_none()); + assert_empty_state(&mut metric); + } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4f02317a91c85..504de130df075 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -43,7 +43,7 @@ use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState}; +use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState, PagedMetrics}; use crate::hummock::CompactorManagerRef; use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; @@ -110,6 +110,9 @@ pub struct HummockManager { // and suggest types with a certain priority. pub compaction_state: CompactionState, full_gc_state: FullGcState, + /// Gather metrics that require accumulation across multiple operations. + /// For example, to get the total number of objects in object store, multiple LISTs are required because a single LIST can visit at most `full_gc_object_limit` objects. + paged_metrics: parking_lot::Mutex, now: Mutex, inflight_time_travel_query: Semaphore, } @@ -283,6 +286,7 @@ impl HummockManager { compactor_streams_change_tx, compaction_state: CompactionState::new(), full_gc_state: FullGcState::new(Some(full_gc_object_limit)), + paged_metrics: parking_lot::Mutex::new(PagedMetrics::new()), now: Mutex::new(0), inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize), }; diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index dffbcc9bb3310..71cee6d08c8ab 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -248,6 +248,7 @@ impl HummockMetaClient for MockHummockMetaClient { _filtered_object_ids: Vec, _total_object_count: u64, _total_object_size: u64, + _start_after: Option, _next_start_after: Option, ) -> Result<()> { unimplemented!() diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index ff3aacb684644..b39ff31961aa9 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -52,6 +52,7 @@ pub trait HummockMetaClient: Send + Sync + 'static { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + start_after: Option, next_start_after: Option, ) -> Result<()>; async fn trigger_full_gc( diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 07632a0e3f775..e8b46ded0ed19 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1517,6 +1517,7 @@ impl HummockMetaClient for MetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + start_after: Option, next_start_after: Option, ) -> Result<()> { let req = ReportFullScanTaskRequest { @@ -1524,6 +1525,7 @@ impl HummockMetaClient for MetaClient { total_object_count, total_object_size, next_start_after, + start_after, }; self.inner.report_full_scan_task(req).await?; Ok(()) diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 0a25f1ffa7193..3a4c487b217a8 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -571,6 +571,7 @@ pub fn start_compactor( } ResponseEvent::FullScanTask(full_scan_task) => { executor.spawn(async move { + let start_after = full_scan_task.start_after.clone(); match Vacuum::handle_full_scan_task( full_scan_task, context.sstable_store.clone(), @@ -582,6 +583,7 @@ pub fn start_compactor( object_ids, total_object_count, total_object_size, + start_after, next_start_after, meta_client, ) @@ -786,6 +788,7 @@ pub fn start_shared_compactor( } } dispatch_compaction_task_request::Task::FullScanTask(full_scan_task) => { + let start_after = full_scan_task.start_after.clone(); match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()) .await { @@ -795,6 +798,7 @@ pub fn start_shared_compactor( total_object_count, total_object_size, next_start_after, + start_after, }; match cloned_grpc_proxy_client .report_full_scan_task(report_full_scan_task_request) diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 074a6b63ee0c4..c23a037c589e0 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -92,6 +92,7 @@ impl HummockMetaClient for MonitoredHummockMetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + start_after: Option, next_start_after: Option, ) -> Result<()> { self.meta_client @@ -99,6 +100,7 @@ impl HummockMetaClient for MonitoredHummockMetaClient { filtered_object_ids, total_object_count, total_object_size, + start_after, next_start_after, ) .await diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index 09c8f512adbeb..31f3c75ec2bf4 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -121,6 +121,7 @@ impl Vacuum { filtered_object_ids: Vec, unfiltered_count: u64, unfiltered_size: u64, + start_after: Option, next_start_after: Option, hummock_meta_client: Arc, ) -> bool { @@ -128,6 +129,7 @@ impl Vacuum { filtered_object_ids_len = filtered_object_ids.len(), unfiltered_count, unfiltered_size, + start_after, next_start_after, "try to report full scan task" ); @@ -136,6 +138,7 @@ impl Vacuum { filtered_object_ids, unfiltered_count, unfiltered_size, + start_after, next_start_after, ) .await