Skip to content

Commit

Permalink
fix(metric): correctly update storage object metric (#18835)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 10, 2024
1 parent f32d013 commit df6aa8a
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 10 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,10 @@ message ReportFullScanTaskRequest {
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
// Start key of the next page.
optional string next_start_after = 4;
// Start key of this page.
optional string start_after = 5;
}

message ReportFullScanTaskResponse {
Expand Down
14 changes: 6 additions & 8 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,12 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<ReportFullScanTaskResponse>, Status> {
let req = request.into_inner();
let hummock_manager = self.hummock_manager.clone();
hummock_manager
.metrics
.total_object_count
.set(req.total_object_count as _);
hummock_manager
.metrics
.total_object_size
.set(req.total_object_size as _);
hummock_manager.update_paged_metrics(
req.start_after,
req.next_start_after.clone(),
req.total_object_count,
req.total_object_size,
);
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
Expand Down
137 changes: 136 additions & 1 deletion src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ impl HummockManager {
.map(|m| m.seq.try_into().unwrap());
Ok(now)
}

pub fn update_paged_metrics(
&self,
start_after: Option<String>,
next_start_after: Option<String>,
total_object_count_in_page: u64,
total_object_size_in_page: u64,
) {
let mut paged_metrics = self.paged_metrics.lock();
paged_metrics.total_object_size.update(
start_after.clone(),
next_start_after.clone(),
total_object_size_in_page,
);
paged_metrics.total_object_count.update(
start_after,
next_start_after,
total_object_count_in_page,
);
if let Some(total_object_size) = paged_metrics.total_object_size.take() {
self.metrics.total_object_size.set(total_object_size as _);
}
if let Some(total_object_count) = paged_metrics.total_object_count.take() {
self.metrics.total_object_count.set(total_object_count as _);
}
}
}

pub struct FullGcState {
Expand All @@ -325,6 +351,84 @@ impl FullGcState {
}
}

pub struct PagedMetrics {
total_object_count: PagedMetric,
total_object_size: PagedMetric,
}

impl PagedMetrics {
pub fn new() -> Self {
Self {
total_object_count: PagedMetric::new(),
total_object_size: PagedMetric::new(),
}
}
}

/// The metrics should be accumulated on a per-page basis and then finalized at the end.
pub struct PagedMetric {
/// identifier of a page
expect_start_key: Option<String>,
/// accumulated metric value of pages seen so far
running_value: u64,
/// final metric value
sealed_value: Option<u64>,
}

impl PagedMetric {
fn new() -> Self {
Self {
expect_start_key: None,
running_value: 0,
sealed_value: None,
}
}

fn update(
&mut self,
current_start_key: Option<String>,
next_start_key: Option<String>,
value: u64,
) {
// Encounter an update without pagination, replace current state.
if current_start_key.is_none() && next_start_key.is_none() {
self.running_value = value;
self.seal();
return;
}
// Encounter an update from unexpected page, reset current state.
if current_start_key != self.expect_start_key {
self.reset();
return;
}
self.running_value += value;
// There are more pages to add.
if next_start_key.is_some() {
self.expect_start_key = next_start_key;
return;
}
// This is the last page, seal the metric value.
self.seal();
}

fn seal(&mut self) {
self.sealed_value = Some(self.running_value);
self.reset();
}

fn reset(&mut self) {
self.running_value = 0;
self.expect_start_key = None;
}

fn take(&mut self) -> Option<u64> {
if self.sealed_value.is_some() {
self.reset();
}
self.sealed_value.take()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -334,7 +438,7 @@ mod tests {
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_rpc_client::HummockMetaClient;

use super::ResponseEvent;
use super::{PagedMetric, ResponseEvent};
use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
use crate::hummock::MockHummockMetaClient;

Expand Down Expand Up @@ -428,4 +532,35 @@ mod tests {
.unwrap()
);
}

#[test]
fn test_paged_metric() {
let mut metric = PagedMetric::new();
fn assert_empty_state(metric: &mut PagedMetric) {
assert_eq!(metric.running_value, 0);
assert!(metric.expect_start_key.is_none());
}
assert!(metric.sealed_value.is_none());
assert_empty_state(&mut metric);

metric.update(None, None, 100);
assert_eq!(metric.take().unwrap(), 100);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);

// "start" is not a legal identifier for the first page
metric.update(Some("start".into()), Some("end".into()), 100);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);

metric.update(None, Some("middle".into()), 100);
assert!(metric.take().is_none());
assert_eq!(metric.running_value, 100);
assert_eq!(metric.expect_start_key, Some("middle".into()));

metric.update(Some("middle".into()), None, 50);
assert_eq!(metric.take().unwrap(), 150);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);
}
}
6 changes: 5 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState};
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState, PagedMetrics};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
Expand Down Expand Up @@ -110,6 +110,9 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
/// Gather metrics that require accumulation across multiple operations.
/// For example, to get the total number of objects in object store, multiple LISTs are required because a single LIST can visit at most `full_gc_object_limit` objects.
paged_metrics: parking_lot::Mutex<PagedMetrics>,
now: Mutex<u64>,
inflight_time_travel_query: Semaphore,
}
Expand Down Expand Up @@ -283,6 +286,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
paged_metrics: parking_lot::Mutex::new(PagedMetrics::new()),
now: Mutex::new(0),
inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl HummockMetaClient for MockHummockMetaClient {
_filtered_object_ids: Vec<HummockSstableObjectId>,
_total_object_count: u64,
_total_object_size: u64,
_start_after: Option<String>,
_next_start_after: Option<String>,
) -> Result<()> {
unimplemented!()
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/src/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub trait HummockMetaClient: Send + Sync + 'static {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()>;
async fn trigger_full_gc(
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1517,13 +1517,15 @@ impl HummockMetaClient for MetaClient {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()> {
let req = ReportFullScanTaskRequest {
object_ids: filtered_object_ids,
total_object_count,
total_object_size,
next_start_after,
start_after,
};
self.inner.report_full_scan_task(req).await?;
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ pub fn start_compactor(
}
ResponseEvent::FullScanTask(full_scan_task) => {
executor.spawn(async move {
let start_after = full_scan_task.start_after.clone();
match Vacuum::handle_full_scan_task(
full_scan_task,
context.sstable_store.clone(),
Expand All @@ -582,6 +583,7 @@ pub fn start_compactor(
object_ids,
total_object_count,
total_object_size,
start_after,
next_start_after,
meta_client,
)
Expand Down Expand Up @@ -786,6 +788,7 @@ pub fn start_shared_compactor(
}
}
dispatch_compaction_task_request::Task::FullScanTask(full_scan_task) => {
let start_after = full_scan_task.start_after.clone();
match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone())
.await
{
Expand All @@ -795,6 +798,7 @@ pub fn start_shared_compactor(
total_object_count,
total_object_size,
next_start_after,
start_after,
};
match cloned_grpc_proxy_client
.report_full_scan_task(report_full_scan_task_request)
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ impl HummockMetaClient for MonitoredHummockMetaClient {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()> {
self.meta_client
.report_full_scan_task(
filtered_object_ids,
total_object_count,
total_object_size,
start_after,
next_start_after,
)
.await
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ impl Vacuum {
filtered_object_ids: Vec<u64>,
unfiltered_count: u64,
unfiltered_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
hummock_meta_client: Arc<dyn HummockMetaClient>,
) -> bool {
tracing::info!(
filtered_object_ids_len = filtered_object_ids.len(),
unfiltered_count,
unfiltered_size,
start_after,
next_start_after,
"try to report full scan task"
);
Expand All @@ -136,6 +138,7 @@ impl Vacuum {
filtered_object_ids,
unfiltered_count,
unfiltered_size,
start_after,
next_start_after,
)
.await
Expand Down

0 comments on commit df6aa8a

Please sign in to comment.