Skip to content

Commit

Permalink
refactor: add metrics for time travel and recent versions (#18690)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 27, 2024
1 parent d877481 commit 8e6ebb0
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 24 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

50 changes: 45 additions & 5 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2442,6 +2442,20 @@ def section_hummock_read(outer_panels):
),
],
),
panels.timeseries_count(
"Safe Version Fetch Count",
"",
[
panels.target(
f"{metric('state_store_safe_version_hit')}",
"",
),
panels.target(
f"{metric('state_store_safe_version_miss')}",
"",
),
],
),
],
)
]
Expand Down Expand Up @@ -3206,6 +3220,10 @@ def section_hummock_manager(outer_panels):
f"{metric('storage_current_version_object_count')}",
"referenced by current version",
),
panels.target(
f"{metric('storage_time_travel_object_count')}",
"referenced by time travel",
),
panels.target(
f"{metric('storage_total_object_count')}",
"all objects (including dangling ones)",
Expand Down Expand Up @@ -3364,6 +3382,33 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_latency(
"Time Travel Replay Latency",
"The latency of replaying a hummock version for time travel",
quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('storage_time_travel_version_replay_latency_bucket')}[$__rate_interval])) by (le))",
f"time_travel_version_replay_latency_p{legend}",
),
[50, 90, 99, "max"],
)
+ [
panels.target(
f"rate({metric('storage_time_travel_version_replay_latency_sum')}[$__rate_interval]) / rate({metric('storage_time_travel_version_replay_latency_count')}[$__rate_interval]) > 0",
"time_travel_version_replay_avg",
),
],
),
panels.timeseries_ops(
"Time Travel Replay Ops",
"The frequency of replaying a hummock version for time travel",
[
panels.target(
f"sum(rate({metric('storage_time_travel_version_replay_latency_count')}[$__rate_interval]))",
"time_travel_version_replay_ops",
),
],
),
],
)
]
Expand Down Expand Up @@ -3498,11 +3543,6 @@ def section_grpc_meta_hummock_manager(outer_panels):
"UnpinVersionBefore",
"path='/meta.HummockManagerService/UnpinVersionBefore'",
),
grpc_metrics_target(
panels,
"UnpinSnapshotBefore",
"path='/meta.HummockManagerService/UnpinSnapshotBefore'",
),
grpc_metrics_target(
panels,
"ReportCompactionTasks",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ pub struct MetaDeveloperConfig {
/// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,

#[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")]
/// Max number of SSTs fetched from meta store per SELECT, during time travel Hummock version replay.
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1895,6 +1899,10 @@ pub mod default {
400
}

pub fn hummock_time_travel_sst_info_fetch_batch_size() -> usize {
10_000
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ meta_max_trivial_move_task_count_per_loop = 256
meta_max_get_task_probe_times = 5
meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000

[batch]
enable_barrier_read = false
Expand Down
4 changes: 4 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ pub fn start(
hummock_time_travel_snapshot_interval: config
.meta
.hummock_time_travel_snapshot_interval,
hummock_time_travel_sst_info_fetch_batch_size: config
.meta
.developer
.hummock_time_travel_sst_info_fetch_batch_size,
min_delta_log_num_for_hummock_version_checkpoint: config
.meta
.min_delta_log_num_for_hummock_version_checkpoint,
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ impl HummockManager {
.all_object_ids_in_time_travel()
.await?
.collect::<HashSet<_>>();
self.metrics
.time_travel_object_count
.set(pinned_object_ids.len() as _);
// 1. filter by watermark
let object_ids = object_ids
.into_iter()
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ impl HummockManager {
query_epoch
)))
})?;
let timer = self
.metrics
.time_travel_version_replay_latency
.start_timer();
let actual_version_id = epoch_to_version.version_id;
tracing::debug!(
query_epoch,
Expand Down Expand Up @@ -342,10 +346,7 @@ impl HummockManager {
.collect::<VecDeque<_>>();
let sst_count = sst_ids.len();
let mut sst_id_to_info = HashMap::with_capacity(sst_count);
let sst_info_fetch_batch_size = std::env::var("RW_TIME_TRAVEL_SST_INFO_FETCH_BATCH_SIZE")
.unwrap_or_else(|_| "100".into())
.parse()
.unwrap();
let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
while !sst_ids.is_empty() {
let sst_infos = hummock_sstable_info::Entity::find()
.filter(hummock_sstable_info::Column::SstId.is_in(
Expand All @@ -365,6 +366,7 @@ impl HummockManager {
))));
}
refill_version(&mut actual_version, &sst_id_to_info, table_id);
timer.observe_duration();
Ok(actual_version)
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub struct MetaOpts {
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
/// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
Expand Down Expand Up @@ -317,6 +318,7 @@ impl MetaOpts {
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
Expand Down
21 changes: 21 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub struct MetaMetrics {
pub old_version_object_count: IntGauge,
/// Total size of objects that is still referenced by non-current versions.
pub old_version_object_size: IntGauge,
/// Total number of objects that is referenced by time travel.
pub time_travel_object_count: IntGauge,
/// Total number of objects that is referenced by current version.
pub current_version_object_count: IntGauge,
/// Total size of objects that is referenced by current version.
Expand Down Expand Up @@ -206,6 +208,8 @@ pub struct MetaMetrics {
pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_latency: LabelGuardedHistogramVec<2>,

pub time_travel_version_replay_latency: Histogram,
}

pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
Expand Down Expand Up @@ -500,6 +504,13 @@ impl MetaMetrics {
registry
).unwrap();

let time_travel_object_count = register_int_gauge_with_registry!(
"storage_time_travel_object_count",
"total number of objects that is referenced by time travel.",
registry
)
.unwrap();

let delta_log_count = register_int_gauge_with_registry!(
"storage_delta_log_count",
"total number of hummock version delta log",
Expand Down Expand Up @@ -740,6 +751,14 @@ impl MetaMetrics {
)
.unwrap();

let opts = histogram_opts!(
"storage_time_travel_version_replay_latency",
"The latency(ms) of replaying a hummock version for time travel",
exponential_buckets(0.01, 10.0, 6).unwrap()
);
let time_travel_version_replay_latency =
register_histogram_with_registry!(opts, registry).unwrap();

Self {
grpc_latency,
barrier_latency,
Expand Down Expand Up @@ -771,6 +790,7 @@ impl MetaMetrics {
stale_object_size,
old_version_object_count,
old_version_object_size,
time_travel_object_count,
current_version_object_count,
current_version_object_size,
total_object_count,
Expand Down Expand Up @@ -814,6 +834,7 @@ impl MetaMetrics {
auto_schema_change_success_cnt,
auto_schema_change_latency,
merge_compaction_group_count,
time_travel_version_replay_latency,
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/storage/hummock_sdk/src/sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl From<&PbSstableInfo> for SstableInfo {

impl From<SstableInfo> for PbSstableInfo {
fn from(sstable_info: SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.sst_size > 0);
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl From<SstableInfo> for PbSstableInfo {

impl From<&SstableInfo> for PbSstableInfo {
fn from(sstable_info: &SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.sst_size > 0);
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
Expand Down Expand Up @@ -216,10 +216,3 @@ impl SstableInfo {
self.key_range = KeyRange::default();
}
}

// Time travel
impl SstableInfo {
pub fn is_stripped(&self) -> bool {
self.object_id == 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl HummockEventHandler {
};

let uploader = HummockUploader::new(
state_store_metrics,
state_store_metrics.clone(),
pinned_version.clone(),
spawn_upload_task,
buffer_tracker,
Expand All @@ -358,6 +358,7 @@ impl HummockEventHandler {
recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new(
pinned_version,
storage_opts.max_cached_recent_versions_number,
state_store_metrics,
))),
write_conflict_detector,
read_version_mapping,
Expand Down
26 changes: 23 additions & 3 deletions src/storage/src/hummock/local_version/recent_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,35 @@
// limitations under the License.

use std::cmp::Ordering;
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockEpoch;

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::monitor::HummockStateStoreMetrics;

pub struct RecentVersions {
latest_version: PinnedVersion,
is_latest_committed: bool,
recent_versions: Vec<PinnedVersion>, // earlier version at the front
max_version_num: usize,
metric: Arc<HummockStateStoreMetrics>,
}

impl RecentVersions {
pub fn new(version: PinnedVersion, max_version_num: usize) -> Self {
pub fn new(
version: PinnedVersion,
max_version_num: usize,
metric: Arc<HummockStateStoreMetrics>,
) -> Self {
assert!(max_version_num > 0);
Self {
latest_version: version,
is_latest_committed: true, // The first version is always treated as committed epochs
recent_versions: Vec::new(),
max_version_num,
metric,
}
}

Expand Down Expand Up @@ -89,6 +97,7 @@ impl RecentVersions {
is_latest_committed: is_committed,
recent_versions,
max_version_num: self.max_version_num,
metric: self.metric.clone(),
}
}

Expand All @@ -104,7 +113,7 @@ impl RecentVersions {
table_id: TableId,
epoch: HummockEpoch,
) -> Option<PinnedVersion> {
if let Some(info) = self
let result = if let Some(info) = self
.latest_version
.version()
.state_table_info
Expand All @@ -118,7 +127,13 @@ impl RecentVersions {
}
} else {
None
};
if result.is_some() {
self.metric.safe_version_hit.inc();
} else {
self.metric.safe_version_miss.inc();
}
result
}

fn get_safe_version_from_recent(
Expand Down Expand Up @@ -192,6 +207,7 @@ mod tests {

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::local_version::recent_versions::RecentVersions;
use crate::monitor::HummockStateStoreMetrics;

const TEST_TABLE_ID1: TableId = TableId::new(233);
const TEST_TABLE_ID2: TableId = TableId::new(234);
Expand Down Expand Up @@ -246,7 +262,11 @@ mod tests {
let epoch4 = epoch3 + 1;
let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
// with at most 2 historical versions
let recent_versions = RecentVersions::new(version1.clone(), 2);
let recent_versions = RecentVersions::new(
version1.clone(),
2,
HummockStateStoreMetrics::unused().into(),
);
assert!(recent_versions.recent_versions.is_empty());
assert!(recent_versions.is_latest_committed);
assert_query_equal(
Expand Down
Loading

0 comments on commit 8e6ebb0

Please sign in to comment.