Skip to content

Commit

Permalink
refactor(storage): remove may_exist interface and metrics (#17609)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jul 8, 2024
1 parent 10c5aa1 commit 5cebcad
Show file tree
Hide file tree
Showing 12 changed files with 7 additions and 553 deletions.
29 changes: 0 additions & 29 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,27 +240,6 @@ impl LocalHummockStorage {
)
.await
}

pub async fn may_exist_inner(
&self,
key_range: TableKeyRange,
read_options: ReadOptions,
) -> StorageResult<bool> {
if self.mem_table.iter(key_range.clone()).next().is_some() {
return Ok(true);
}

let (key_range, read_snapshot) = read_filter_for_version(
HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest
read_options.table_id,
key_range,
&self.read_version,
)?;

self.hummock_version_reader
.may_exist(key_range, read_options, read_snapshot)
.await
}
}

impl StateStoreRead for LocalHummockStorage {
Expand Down Expand Up @@ -319,14 +298,6 @@ impl LocalStateStore for LocalHummockStorage {
type Iter<'a> = LocalHummockStorageIterator<'a>;
type RevIter<'a> = LocalHummockStorageRevIterator<'a>;

fn may_exist(
&self,
key_range: TableKeyRange,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<bool>> + Send + '_ {
self.may_exist_inner(key_range, read_options)
}

async fn get(
&self,
key: TableKey<Bytes>,
Expand Down
135 changes: 2 additions & 133 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{
bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey,
bound_table_key_range, FullKey, TableKey, TableKeyRange, UserKey,
};
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::table_watermark::{
Expand Down Expand Up @@ -61,9 +61,7 @@ use crate::hummock::{
use crate::mem_table::{
ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
};
use crate::monitor::{
GetLocalMetricsGuard, HummockStateStoreMetrics, MayExistLocalMetricsGuard, StoreLocalStatistic,
};
use crate::monitor::{GetLocalMetricsGuard, HummockStateStoreMetrics, StoreLocalStatistic};
use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions};

pub type CommittedVersion = PinnedVersion;
Expand Down Expand Up @@ -913,135 +911,6 @@ impl HummockVersionReader {
Ok(())
}

// Note: this method will not check the kv tomestones and delete range tomestones
pub async fn may_exist(
&self,
table_key_range: TableKeyRange,
read_options: ReadOptions,
read_version_tuple: ReadVersionTuple,
) -> StorageResult<bool> {
if is_empty_key_range(&table_key_range) {
return Ok(false);
}

let table_id = read_options.table_id;
let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
let mut stats_guard =
MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);

// 1. check staging data
for imm in &imms {
if imm.range_exists(&table_key_range) {
return Ok(true);
}
}

let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range);
let user_key_range_ref = (
user_key_range.0.as_ref().map(UserKey::as_ref),
user_key_range.1.as_ref().map(UserKey::as_ref),
);
let bloom_filter_prefix_hash = if let Some(prefix_hint) = read_options.prefix_hint {
Sstable::hash_for_bloom_filter(&prefix_hint, table_id.table_id)
} else {
// only use `table_key_range` to see whether all SSTs are filtered out
// without looking at bloom filter because prefix_hint is not provided
if !uncommitted_ssts.is_empty() {
// uncommitted_ssts is already pruned by `table_key_range` so no extra check is
// needed.
return Ok(true);
}
for level in committed_version.levels(table_id) {
match level.level_type() {
LevelType::Overlapping | LevelType::Unspecified => {
if prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range)
.next()
.is_some()
{
return Ok(true);
}
}
LevelType::Nonoverlapping => {
if prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref)
.next()
.is_some()
{
return Ok(true);
}
}
}
}
return Ok(false);
};

// 2. order guarantee: imm -> sst
for local_sst in &uncommitted_ssts {
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(local_sst, &mut stats_guard.local_stats)
.await?
.as_ref(),
&user_key_range_ref,
bloom_filter_prefix_hash,
&mut stats_guard.local_stats,
) {
return Ok(true);
}
}

// 3. read from committed_version sst file
// Because SST meta records encoded key range,
// the filter key needs to be encoded as well.
assert!(committed_version.is_valid());
for level in committed_version.levels(table_id) {
if level.table_infos.is_empty() {
continue;
}
match level.level_type() {
LevelType::Overlapping | LevelType::Unspecified => {
let sstable_infos =
prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
for sstable_info in sstable_infos {
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(sstable_info, &mut stats_guard.local_stats)
.await?
.as_ref(),
&user_key_range_ref,
bloom_filter_prefix_hash,
&mut stats_guard.local_stats,
) {
return Ok(true);
}
}
}
LevelType::Nonoverlapping => {
let table_infos =
prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref);

for table_info in table_infos {
stats_guard.local_stats.may_exist_check_sstable_count += 1;
if hit_sstable_bloom_filter(
self.sstable_store
.sstable(table_info, &mut stats_guard.local_stats)
.await?
.as_ref(),
&user_key_range_ref,
bloom_filter_prefix_hash,
&mut stats_guard.local_stats,
) {
return Ok(true);
}
}
}
}
}

Ok(false)
}

pub async fn iter_log(
&self,
version: PinnedVersion,
Expand Down
9 changes: 0 additions & 9 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,15 +546,6 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
type Iter<'a> = impl StateStoreIter + 'a;
type RevIter<'a> = impl StateStoreIter + 'a;

#[allow(clippy::unused_async)]
async fn may_exist(
&self,
_key_range: TableKeyRange,
_read_options: ReadOptions,
) -> StorageResult<bool> {
Ok(true)
}

async fn get(
&self,
key: TableKey<Bytes>,
Expand Down
46 changes: 0 additions & 46 deletions src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub struct StoreLocalStatistic {
pub staging_sst_iter_count: u64,
pub overlapping_iter_count: u64,
pub non_overlapping_iter_count: u64,
pub may_exist_check_sstable_count: u64,
pub sub_iter_count: u64,
pub found_key: bool,

Expand Down Expand Up @@ -233,11 +232,9 @@ struct LocalStoreMetrics {
staging_sst_iter_count: LocalHistogram,
overlapping_iter_count: LocalHistogram,
non_overlapping_iter_count: LocalHistogram,
may_exist_check_sstable_count: LocalHistogram,
sub_iter_count: LocalHistogram,
iter_filter_metrics: BloomFilterLocalMetrics,
get_filter_metrics: BloomFilterLocalMetrics,
may_exist_filter_metrics: BloomFilterLocalMetrics,
collect_count: usize,

staging_imm_get_count: LocalHistogram,
Expand Down Expand Up @@ -324,18 +321,12 @@ impl LocalStoreMetrics {
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "committed-non-overlapping-iter"])
.local();
let may_exist_check_sstable_count = metrics
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "may-exist-check-sstable"])
.local();
let sub_iter_count = metrics
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "sub-iter"])
.local();
let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
let may_exist_filter_metrics =
BloomFilterLocalMetrics::new(metrics, table_id_label, "may_exist");

let staging_imm_get_count = metrics
.iter_merge_sstable_counts
Expand Down Expand Up @@ -372,10 +363,8 @@ impl LocalStoreMetrics {
overlapping_iter_count,
sub_iter_count,
non_overlapping_iter_count,
may_exist_check_sstable_count,
get_filter_metrics,
iter_filter_metrics,
may_exist_filter_metrics,
collect_count: 0,
staging_imm_get_count,
staging_sst_get_count,
Expand Down Expand Up @@ -425,7 +414,6 @@ add_local_metrics_histogram!(
overlapping_iter_count,
non_overlapping_iter_count,
sub_iter_count,
may_exist_check_sstable_count,
staging_imm_get_count,
staging_sst_get_count,
overlapping_get_count,
Expand Down Expand Up @@ -574,37 +562,3 @@ impl Drop for IterLocalMetricsGuard {
});
}
}

pub struct MayExistLocalMetricsGuard {
metrics: Arc<HummockStateStoreMetrics>,
table_id: TableId,
pub local_stats: StoreLocalStatistic,
}

impl MayExistLocalMetricsGuard {
pub fn new(metrics: Arc<HummockStateStoreMetrics>, table_id: TableId) -> Self {
Self {
metrics,
table_id,
local_stats: StoreLocalStatistic::default(),
}
}
}

impl Drop for MayExistLocalMetricsGuard {
fn drop(&mut self) {
LOCAL_METRICS.with_borrow_mut(|local_metrics| {
let table_metrics = local_metrics
.entry(self.table_id.table_id)
.or_insert_with(|| {
LocalStoreMetrics::new(
self.metrics.as_ref(),
self.table_id.to_string().as_str(),
)
});
self.local_stats.report(table_metrics);
self.local_stats
.report_bloom_filter_metrics(&table_metrics.may_exist_filter_metrics);
});
}
}
21 changes: 3 additions & 18 deletions src/storage/src/monitor/monitored_storage_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};

use prometheus::{
exponential_buckets, histogram_opts, linear_buckets, register_histogram_vec_with_registry,
register_histogram_with_registry, Histogram, Registry,
exponential_buckets, histogram_opts, linear_buckets, register_histogram_with_registry,
Histogram, Registry,
};
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram,
LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
RelabeledGuardedIntGaugeVec, RelabeledHistogramVec,
RelabeledGuardedIntGaugeVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
Expand Down Expand Up @@ -55,7 +55,6 @@ pub struct MonitoredStorageMetrics {

// [table_id, op_type]
pub iter_log_op_type_counts: LabelGuardedIntCounterVec<2>,
pub may_exist_duration: RelabeledHistogramVec,

pub sync_duration: Histogram,
pub sync_size: Histogram,
Expand Down Expand Up @@ -239,19 +238,6 @@ impl MonitoredStorageMetrics {
)
.unwrap();

let opts = histogram_opts!(
"state_store_may_exist_duration",
"Histogram of may exist time that have been issued to state store",
buckets,
);
let may_exist_duration =
register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
let may_exist_duration = RelabeledHistogramVec::with_metric_level(
MetricLevel::Debug,
may_exist_duration,
metric_level,
);

let opts = histogram_opts!(
"state_store_sync_duration",
"Histogram of time spent on compacting shared buffer to remote storage",
Expand All @@ -277,7 +263,6 @@ impl MonitoredStorageMetrics {
iter_counts,
iter_in_progress_counts,
iter_log_op_type_counts,
may_exist_duration,
sync_duration,
sync_size,
}
Expand Down
20 changes: 0 additions & 20 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,26 +205,6 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
type Iter<'a> = impl StateStoreIter + 'a;
type RevIter<'a> = impl StateStoreIter + 'a;

async fn may_exist(
&self,
key_range: TableKeyRange,
read_options: ReadOptions,
) -> StorageResult<bool> {
let table_id_label = read_options.table_id.to_string();
let timer = self
.storage_metrics
.may_exist_duration
.with_label_values(&[table_id_label.as_str()])
.start_timer();
let res = self
.inner
.may_exist(key_range, read_options)
.verbose_instrument_await("store_may_exist")
.await;
timer.observe_duration();
res
}

fn get(
&self,
key: TableKey<Bytes>,
Expand Down
Loading

0 comments on commit 5cebcad

Please sign in to comment.