Skip to content

Commit

Permalink
feat(stats): introduce block efficiency metrics (#14654)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jan 23, 2024
1 parent 5436acf commit 2e16c26
Show file tree
Hide file tree
Showing 16 changed files with 475 additions and 150 deletions.
35 changes: 19 additions & 16 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{
CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
MonitoredStorageMetrics, ObjectStoreMetrics,
global_hummock_state_store_metrics, CompactorMetrics, HummockMetrics, HummockStateStoreMetrics,
MonitoredStateStore, MonitoredStorageMetrics, ObjectStoreMetrics,
};
use risingwave_storage::opts::StorageOpts;
use risingwave_storage::{StateStore, StateStoreImpl};
Expand Down Expand Up @@ -162,17 +162,20 @@ impl HummockServiceOpts {

let opts = self.get_storage_opts();

Ok(Arc::new(SstableStore::new(
Arc::new(object_store),
opts.data_directory,
opts.block_cache_capacity_mb * (1 << 20),
opts.meta_cache_capacity_mb * (1 << 20),
0,
opts.block_cache_capacity_mb * (1 << 20),
opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
)))
Ok(Arc::new(SstableStore::new(SstableStoreConfig {
store: Arc::new(object_store),
path: opts.data_directory,
block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20),
meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20),
high_priority_ratio: 0,
prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
max_prefetch_block_number: opts.max_prefetch_block_number,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
})))
}
}
33 changes: 18 additions & 15 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
Expand All @@ -33,9 +33,9 @@ use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use risingwave_storage::hummock::store::version::HummockVersionReader;
use risingwave_storage::hummock::store::HummockStorageIterator;
use risingwave_storage::hummock::{
get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore,
get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore, SstableStoreConfig,
};
use risingwave_storage::monitor::HummockStateStoreMetrics;
use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics};
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use rw_futures_util::select_all;
Expand Down Expand Up @@ -66,18 +66,21 @@ impl HummockJavaBindingIterator {
)
.await,
);
let sstable_store = Arc::new(SstableStore::new(
object_store,
read_plan.data_dir,
1 << 10,
1 << 10,
0,
1 << 10,
16,
FileCache::none(),
FileCache::none(),
None,
));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: read_plan.data_dir,
block_cache_capacity: 1 << 10,
meta_cache_capacity: 1 << 10,
high_priority_ratio: 0,
prefetch_buffer_capacity: 1 << 10,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
}));
let reader = HummockVersionReader::new(
sstable_store,
Arc::new(HummockStateStoreMetrics::unused()),
Expand Down
28 changes: 16 additions & 12 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand All @@ -41,26 +42,29 @@ use risingwave_storage::hummock::sstable_store::SstableStoreRef;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompactionDeleteRangeIterator, FileCache, SstableBuilder, SstableBuilderOptions,
SstableIterator, SstableStore, SstableWriterOptions, Xor16FilterBuilder,
SstableIterator, SstableStore, SstableStoreConfig, SstableWriterOptions, Xor16FilterBuilder,
};
use risingwave_storage::monitor::{
global_hummock_state_store_metrics, CompactorMetrics, StoreLocalStatistic,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};

pub fn mock_sstable_store() -> SstableStoreRef {
let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused()));
let store = Arc::new(ObjectStoreImpl::InMem(store));
let path = "test".to_string();
Arc::new(SstableStore::new(
Arc::new(SstableStore::new(SstableStoreConfig {
store,
path,
64 << 20,
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
))
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}))
}

pub fn default_writer_opts() -> SstableWriterOptions {
Expand Down
31 changes: 16 additions & 15 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ use criterion::{criterion_group, criterion_main, Criterion};
use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
BatchSstableWriterFactory, CachePolicy, FileCache, HummockResult, MemoryLimiter,
SstableBuilder, SstableBuilderOptions, SstableStore, SstableWriterFactory,
SstableBuilder, SstableBuilderOptions, SstableStore, SstableStoreConfig, SstableWriterFactory,
SstableWriterOptions, StreamingSstableWriterFactory, Xor16FilterBuilder,
};
use risingwave_storage::monitor::ObjectStoreMetrics;
use risingwave_storage::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};

const RANGE: Range<u64> = 0..1500000;
const VALUE: &[u8] = &[0; 400];
Expand Down Expand Up @@ -141,18 +141,19 @@ fn bench_builder(
.monitored(metrics)
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));
let sstable_store = Arc::new(SstableStore::new(
object_store,
"test".to_string(),
64 << 20,
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: "test".to_string(),
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}));

let mut group = c.benchmark_group("bench_multi_builder");
group
Expand Down
37 changes: 18 additions & 19 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use risingwave_object_store::object::build_remote_object_store;
use risingwave_storage::filter_key_extractor::{
FakeRemoteTableAccessor, RpcFilterKeyExtractorManager,
};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
use risingwave_storage::opts::StorageOpts;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -97,33 +97,32 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
&storage_memory_config,
)));

let state_store_stats = Arc::new(HummockStateStoreMetrics::unused());
let object_store_stats = Arc::new(ObjectStoreMetrics::unused());
let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());

let compactor_metrics = Arc::new(CompactorMetrics::unused());

let object_store = build_remote_object_store(
&args.object_storage,
object_store_stats,
object_store_metrics,
"Hummock",
ObjectStoreConfig::default(),
)
.await;

let sstable_store = {
Arc::new(SstableStore::new(
Arc::new(object_store),
storage_opts.data_directory.to_string(),
storage_opts.block_cache_capacity_mb * (1 << 20),
storage_opts.meta_cache_capacity_mb * (1 << 20),
storage_opts.high_priority_ratio,
storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
storage_opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
))
};
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: Arc::new(object_store),
path: storage_opts.data_directory.to_string(),
block_cache_capacity: storage_opts.block_cache_capacity_mb * (1 << 20),
meta_cache_capacity: storage_opts.meta_cache_capacity_mb * (1 << 20),
high_priority_ratio: storage_opts.high_priority_ratio,
prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
max_prefetch_block_number: storage_opts.max_prefetch_block_number,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
}));

let (hummock_meta_client, notification_client, notifier) = {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
Expand Down Expand Up @@ -158,7 +157,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
hummock_meta_client.clone(),
notification_client,
key_filter_manager,
state_store_stats,
state_store_metrics,
compactor_metrics,
)
.await
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/hummock/file_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl Value for CachedBlock {

fn serialized_len(&self) -> usize {
1 /* type */ + match self {
CachedBlock::Loaded { block } => block.raw_data().len(),
CachedBlock::Loaded { block } => block.raw().len(),
CachedBlock::Fetched { bytes, uncompressed_capacity: _ } => 8 + bytes.len(),
}
}
Expand Down Expand Up @@ -506,7 +506,7 @@ impl std::io::Read for CachedBlockCursor {
if self.pos < 1 {
self.pos += copy([0], &mut buf);
}
self.pos += copy(&block.raw_data()[self.pos - 1..], &mut buf);
self.pos += copy(&block.raw()[self.pos - 1..], &mut buf);
}
CachedBlock::Fetched {
bytes,
Expand Down Expand Up @@ -541,7 +541,7 @@ impl Value for Box<Block> {
type Cursor = BoxBlockCursor;

fn serialized_len(&self) -> usize {
self.raw_data().len()
self.raw().len()
}

fn read(buf: &[u8]) -> CodingResult<Self> {
Expand Down Expand Up @@ -571,7 +571,7 @@ impl BoxBlockCursor {
impl std::io::Read for BoxBlockCursor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let pos = self.pos;
self.pos += copy(&self.inner.raw_data()[self.pos..], buf);
self.pos += copy(&self.inner.raw()[self.pos..], buf);
let n = self.pos - pos;
Ok(n)
}
Expand Down Expand Up @@ -748,7 +748,7 @@ mod tests {
std::io::copy(&mut cursor, &mut buf).unwrap();
let target = cursor.into_inner();
let block = Box::<Block>::read(&buf[..]).unwrap();
assert_eq!(target.raw_data(), block.raw_data());
assert_eq!(target.raw(), block.raw());
}

{
Expand Down Expand Up @@ -779,7 +779,7 @@ mod tests {
CachedBlock::Loaded { block } => block,
CachedBlock::Fetched { .. } => panic!(),
};
assert_eq!(target.raw_data(), block.raw_data());
assert_eq!(target.raw(), block.raw());
}

{
Expand Down
26 changes: 14 additions & 12 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
Expand All @@ -33,9 +34,9 @@ use crate::hummock::test_utils::{
};
use crate::hummock::{
DeleteRangeTombstone, FileCache, HummockValue, SstableBuilderOptions, SstableIterator,
SstableIteratorType, SstableStoreRef, TableHolder,
SstableIteratorType, SstableStoreConfig, SstableStoreRef, TableHolder,
};
use crate::monitor::ObjectStoreMetrics;
use crate::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};

/// `assert_eq` two `Vec<u8>` with human-readable format.
#[macro_export]
Expand All @@ -59,18 +60,19 @@ pub fn mock_sstable_store() -> SstableStoreRef {

pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
let path = "test".to_string();
Arc::new(SstableStore::new(
Arc::new(SstableStore::new(SstableStoreConfig {
store,
path,
64 << 20,
64 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
))
block_cache_capacity: 64 << 20,
meta_cache_capacity: 64 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}))
}

pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
Expand Down
Loading

0 comments on commit 2e16c26

Please sign in to comment.