Skip to content

Commit

Permalink
Merge branch 'main' into fix/user-privilege-conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Mar 12, 2024
2 parents c0cde55 + 5a8a6a2 commit 27e652f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 89 deletions.
2 changes: 1 addition & 1 deletion src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl SstableStore {
_ => (),
}
stats.cache_meta_block_total += 1;
lookup_response.verbose_instrument_await("sstable")
lookup_response
}

pub async fn list_object_metadata_from_object_store(
Expand Down
22 changes: 5 additions & 17 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::collections::HashSet;
use std::iter::once;
use std::ops::Bound::Included;
use std::sync::Arc;
use std::time::Instant;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock;
Expand All @@ -38,7 +38,6 @@ use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::{LevelType, SstableInfo};
use sync_point::sync_point;
use tracing::Instrument;

use super::StagingDataIterator;
use crate::error::StorageResult;
Expand Down Expand Up @@ -777,8 +776,6 @@ impl HummockVersionReader {
read_version_tuple: ReadVersionTuple,
mem_table: Option<MemTableHummockIterator<'b>>,
) -> StorageResult<StreamTypeOfIter<HummockStorageIteratorInner<'b>>> {
let table_id_string = read_options.table_id.to_string();
let table_id_label = table_id_string.as_str();
let (imms, uncommitted_ssts, committed) = read_version_tuple;

let mut local_stats = StoreLocalStatistic::default();
Expand Down Expand Up @@ -814,7 +811,6 @@ impl HummockVersionReader {
let table_holder = self
.sstable_store
.sstable(sstable_info, &mut local_stats)
.instrument(tracing::trace_span!("get_sstable"))
.await?;

if !table_holder.meta.monotonic_tombstone_events.is_empty()
Expand Down Expand Up @@ -846,11 +842,7 @@ impl HummockVersionReader {

let mut non_overlapping_iters = Vec::new();
let mut overlapping_iters = Vec::new();
let timer = self
.state_store_metrics
.iter_fetch_meta_duration
.with_label_values(&[table_id_label])
.start_timer();
let timer = Instant::now();

for level in committed.levels(read_options.table_id) {
if level.table_infos.is_empty() {
Expand Down Expand Up @@ -893,7 +885,6 @@ impl HummockVersionReader {
let sstable = self
.sstable_store
.sstable(&sstables[0], &mut local_stats)
.instrument(tracing::trace_span!("get_sstable"))
.await?;
if !sstable.meta.monotonic_tombstone_events.is_empty()
&& !read_options.ignore_range_tombstone
Expand Down Expand Up @@ -938,7 +929,6 @@ impl HummockVersionReader {
let sstable = self
.sstable_store
.sstable(sstable_info, &mut local_stats)
.instrument(tracing::trace_span!("get_sstable"))
.await?;
assert_eq!(sstable_info.get_object_id(), sstable.id);
if !sstable.meta.monotonic_tombstone_events.is_empty()
Expand Down Expand Up @@ -966,8 +956,9 @@ impl HummockVersionReader {
}
}
}
let fetch_meta_duration_sec = timer.stop_and_record();
let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
let table_id_string = read_options.table_id.to_string();
tracing::warn!("Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
table_id_string, epoch, fetch_meta_duration_sec, local_stats.cache_meta_block_miss);
self.state_store_metrics
Expand Down Expand Up @@ -1006,10 +997,7 @@ impl HummockVersionReader {
Some(committed),
delete_range_iter,
);
user_iter
.rewind()
.verbose_instrument_await("rewind")
.await?;
user_iter.rewind().await?;
local_stats.found_key = user_iter.is_valid();
local_stats.sub_iter_count = local_stats.staging_imm_iter_count
+ local_stats.staging_sst_iter_count
Expand Down
170 changes: 168 additions & 2 deletions src/storage/src/monitor/monitored_storage_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::OnceLock;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};

use prometheus::core::{AtomicU64, GenericLocalCounter};
use prometheus::local::LocalHistogram;
use prometheus::{
exponential_buckets, histogram_opts, linear_buckets, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry, Histogram, Registry,
};
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledHistogramVec,
LabelGuardedMetric, RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledHistogramVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::register_guarded_histogram_vec_with_registry;
Expand Down Expand Up @@ -218,4 +223,165 @@ impl MonitoredStorageMetrics {
pub fn unused() -> Self {
global_storage_metrics(MetricLevel::Disabled)
}

pub fn local_metrics(&self, table_label: &str) -> LocalStorageMetrics {
let iter_init_duration = self
.iter_init_duration
.with_label_values(&[table_label])
.local();
let iter_in_process_counts = self
.iter_in_process_counts
.with_label_values(&[table_label])
.local();
let iter_scan_duration = self
.iter_scan_duration
.with_label_values(&[table_label])
.local();
let iter_item = self.iter_item.with_label_values(&[table_label]).local();
let iter_size = self.iter_size.with_label_values(&[table_label]).local();

let get_duration = self.get_duration.with_label_values(&[table_label]).local();
let get_key_size = self.get_key_size.with_label_values(&[table_label]).local();
let get_value_size = self
.get_value_size
.with_label_values(&[table_label])
.local();

LocalStorageMetrics {
iter_init_duration,
iter_scan_duration,
iter_in_process_counts,
iter_item,
iter_size,
get_duration,
get_key_size,
get_value_size,
report_count: 0,
}
}
}

pub struct LocalStorageMetrics {
iter_init_duration: LabelGuardedMetric<LocalHistogram, 1>,
iter_scan_duration: LabelGuardedMetric<LocalHistogram, 1>,
iter_in_process_counts: GenericLocalCounter<AtomicU64>,
iter_item: LocalHistogram,
iter_size: LocalHistogram,

get_duration: LabelGuardedMetric<LocalHistogram, 1>,
get_key_size: LocalHistogram,
get_value_size: LocalHistogram,
report_count: usize,
}

impl LocalStorageMetrics {
pub fn may_flush(&mut self) {
self.report_count += 1;
if self.report_count > MAX_FLUSH_TIMES {
self.iter_scan_duration.flush();
self.iter_init_duration.flush();
self.iter_in_process_counts.flush();
self.iter_item.flush();
self.iter_size.flush();
self.get_duration.flush();
self.get_key_size.flush();
self.get_value_size.flush();
self.report_count = 0;
}
}
}

pub struct MonitoredStateStoreIterStats {
pub iter_init_duration: Duration,
pub iter_scan_time: Instant,
pub total_items: usize,
pub total_size: usize,
pub table_id: u32,
pub metrics: Arc<MonitoredStorageMetrics>,
}

thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32, LocalStorageMetrics>> = RefCell::new(HashMap::default()));
const MAX_FLUSH_TIMES: usize = 64;

impl MonitoredStateStoreIterStats {
pub fn new(
table_id: u32,
iter_init_duration: Duration,
metrics: Arc<MonitoredStorageMetrics>,
) -> Self {
Self {
iter_init_duration,
iter_scan_time: Instant::now(),
total_items: 0,
total_size: 0,
table_id,
metrics,
}
}

pub fn report(&self) {
LOCAL_METRICS.with_borrow_mut(|local_metrics| {
let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
let table_label = self.table_id.to_string();
self.metrics.local_metrics(&table_label)
});
let iter_scan_duration = self.iter_scan_time.elapsed();
table_metrics
.iter_scan_duration
.observe(iter_scan_duration.as_secs_f64());
table_metrics
.iter_init_duration
.observe(self.iter_init_duration.as_secs_f64());
table_metrics.iter_in_process_counts.inc();
table_metrics.iter_item.observe(self.total_items as f64);
table_metrics.iter_size.observe(self.total_size as f64);
table_metrics.may_flush();
});
}
}

impl Drop for MonitoredStateStoreIterStats {
fn drop(&mut self) {
self.report();
}
}

pub struct MonitoredStateStoreGetStats {
pub get_duration: Instant,
pub get_key_size: usize,
pub get_value_size: usize,
pub table_id: u32,
pub metrics: Arc<MonitoredStorageMetrics>,
}

impl MonitoredStateStoreGetStats {
pub fn new(table_id: u32, metrics: Arc<MonitoredStorageMetrics>) -> Self {
Self {
get_duration: Instant::now(),
get_key_size: 0,
get_value_size: 0,
table_id,
metrics,
}
}

pub fn report(&self) {
LOCAL_METRICS.with_borrow_mut(|local_metrics| {
let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
let table_label = self.table_id.to_string();
self.metrics.local_metrics(&table_label)
});
let get_duration = self.get_duration.elapsed();
table_metrics
.get_duration
.observe(get_duration.as_secs_f64());
table_metrics.get_key_size.observe(self.get_key_size as _);
if self.get_value_size > 0 {
table_metrics
.get_value_size
.observe(self.get_value_size as _);
}
table_metrics.may_flush();
});
}
}
Loading

0 comments on commit 27e652f

Please sign in to comment.