Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metric): correctly update storage object metric #18835

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,10 @@ message ReportFullScanTaskRequest {
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
// Start key of the next page.
optional string next_start_after = 4;
// Start key of this page.
optional string start_after = 5;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another PB change related to severless compaction @yufansong @arkbriar

}

message ReportFullScanTaskResponse {
Expand Down
14 changes: 6 additions & 8 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,12 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<ReportFullScanTaskResponse>, Status> {
let req = request.into_inner();
let hummock_manager = self.hummock_manager.clone();
hummock_manager
.metrics
.total_object_count
.set(req.total_object_count as _);
hummock_manager
.metrics
.total_object_size
.set(req.total_object_size as _);
hummock_manager.update_paged_metrics(
req.start_after,
req.next_start_after.clone(),
req.total_object_count,
req.total_object_size,
);
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
Expand Down
137 changes: 136 additions & 1 deletion src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ impl HummockManager {
.map(|m| m.seq.try_into().unwrap());
Ok(now)
}

pub fn update_paged_metrics(
&self,
start_after: Option<String>,
next_start_after: Option<String>,
total_object_count_in_page: u64,
total_object_size_in_page: u64,
) {
let mut paged_metrics = self.paged_metrics.lock();
paged_metrics.total_object_size.update(
start_after.clone(),
next_start_after.clone(),
total_object_size_in_page,
);
paged_metrics.total_object_count.update(
start_after,
next_start_after,
total_object_count_in_page,
);
if let Some(total_object_size) = paged_metrics.total_object_size.take() {
self.metrics.total_object_size.set(total_object_size as _);
}
if let Some(total_object_count) = paged_metrics.total_object_count.take() {
self.metrics.total_object_count.set(total_object_count as _);
}
}
}

pub struct FullGcState {
Expand All @@ -325,6 +351,84 @@ impl FullGcState {
}
}

pub struct PagedMetrics {
total_object_count: PagedMetric,
total_object_size: PagedMetric,
}

impl PagedMetrics {
pub fn new() -> Self {
Self {
total_object_count: PagedMetric::new(),
total_object_size: PagedMetric::new(),
}
}
}

/// The metrics should be accumulated on a per-page basis and then finalized at the end.
pub struct PagedMetric {
/// identifier of a page
expect_start_key: Option<String>,
/// accumulated metric value of pages seen so far
running_value: u64,
/// final metric value
sealed_value: Option<u64>,
}

impl PagedMetric {
fn new() -> Self {
Self {
expect_start_key: None,
running_value: 0,
sealed_value: None,
}
}

fn update(
&mut self,
current_start_key: Option<String>,
next_start_key: Option<String>,
value: u64,
) {
// Encounter an update without pagination, replace current state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc for "manualy trigger full gc" ?

if current_start_key.is_none() && next_start_key.is_none() {
self.running_value = value;
self.seal();
return;
}
// Encounter an update from unexpected page, reset current state.
if current_start_key != self.expect_start_key {
self.reset();
return;
}
self.running_value += value;
// There are more pages to add.
if next_start_key.is_some() {
self.expect_start_key = next_start_key;
return;
}
// This is the last page, seal the metric value.
self.seal();
}

fn seal(&mut self) {
self.sealed_value = Some(self.running_value);
self.reset();
}

fn reset(&mut self) {
self.running_value = 0;
self.expect_start_key = None;
}

fn take(&mut self) -> Option<u64> {
if self.sealed_value.is_some() {
self.reset();
}
self.sealed_value.take()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -334,7 +438,7 @@ mod tests {
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_rpc_client::HummockMetaClient;

use super::ResponseEvent;
use super::{PagedMetric, ResponseEvent};
use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
use crate::hummock::MockHummockMetaClient;

Expand Down Expand Up @@ -428,4 +532,35 @@ mod tests {
.unwrap()
);
}

#[test]
fn test_paged_metric() {
let mut metric = PagedMetric::new();
fn assert_empty_state(metric: &mut PagedMetric) {
assert_eq!(metric.running_value, 0);
assert!(metric.expect_start_key.is_none());
}
assert!(metric.sealed_value.is_none());
assert_empty_state(&mut metric);

metric.update(None, None, 100);
assert_eq!(metric.take().unwrap(), 100);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);

// "start" is not a legal identifier for the first page
metric.update(Some("start".into()), Some("end".into()), 100);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);

metric.update(None, Some("middle".into()), 100);
assert!(metric.take().is_none());
assert_eq!(metric.running_value, 100);
assert_eq!(metric.expect_start_key, Some("middle".into()));

metric.update(Some("middle".into()), None, 50);
assert_eq!(metric.take().unwrap(), 150);
assert!(metric.take().is_none());
assert_empty_state(&mut metric);
}
}
6 changes: 5 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState};
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState, PagedMetrics};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
Expand Down Expand Up @@ -110,6 +110,9 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
/// Gather metrics that require accumulation across multiple operations.
/// For example, to get the total number of objects in object store, multiple LISTs are required because a single LIST can visit at most `full_gc_object_limit` objects.
paged_metrics: parking_lot::Mutex<PagedMetrics>,
now: Mutex<u64>,
}

Expand Down Expand Up @@ -281,6 +284,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
paged_metrics: parking_lot::Mutex::new(PagedMetrics::new()),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl HummockMetaClient for MockHummockMetaClient {
_filtered_object_ids: Vec<HummockSstableObjectId>,
_total_object_count: u64,
_total_object_size: u64,
_start_after: Option<String>,
_next_start_after: Option<String>,
) -> Result<()> {
unimplemented!()
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/src/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub trait HummockMetaClient: Send + Sync + 'static {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()>;
async fn trigger_full_gc(
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1517,13 +1517,15 @@ impl HummockMetaClient for MetaClient {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()> {
let req = ReportFullScanTaskRequest {
object_ids: filtered_object_ids,
total_object_count,
total_object_size,
next_start_after,
start_after,
};
self.inner.report_full_scan_task(req).await?;
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ pub fn start_compactor(
}
ResponseEvent::FullScanTask(full_scan_task) => {
executor.spawn(async move {
let start_after = full_scan_task.start_after.clone();
match Vacuum::handle_full_scan_task(
full_scan_task,
context.sstable_store.clone(),
Expand All @@ -582,6 +583,7 @@ pub fn start_compactor(
object_ids,
total_object_count,
total_object_size,
start_after,
next_start_after,
meta_client,
)
Expand Down Expand Up @@ -786,6 +788,7 @@ pub fn start_shared_compactor(
}
}
dispatch_compaction_task_request::Task::FullScanTask(full_scan_task) => {
let start_after = full_scan_task.start_after.clone();
match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone())
.await
{
Expand All @@ -795,6 +798,7 @@ pub fn start_shared_compactor(
total_object_count,
total_object_size,
next_start_after,
start_after,
};
match cloned_grpc_proxy_client
.report_full_scan_task(report_full_scan_task_request)
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ impl HummockMetaClient for MonitoredHummockMetaClient {
filtered_object_ids: Vec<HummockSstableObjectId>,
total_object_count: u64,
total_object_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
) -> Result<()> {
self.meta_client
.report_full_scan_task(
filtered_object_ids,
total_object_count,
total_object_size,
start_after,
next_start_after,
)
.await
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ impl Vacuum {
filtered_object_ids: Vec<u64>,
unfiltered_count: u64,
unfiltered_size: u64,
start_after: Option<String>,
next_start_after: Option<String>,
hummock_meta_client: Arc<dyn HummockMetaClient>,
) -> bool {
tracing::info!(
filtered_object_ids_len = filtered_object_ids.len(),
unfiltered_count,
unfiltered_size,
start_after,
next_start_after,
"try to report full scan task"
);
Expand All @@ -136,6 +138,7 @@ impl Vacuum {
filtered_object_ids,
unfiltered_count,
unfiltered_size,
start_after,
next_start_after,
)
.await
Expand Down
Loading