Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stats): introduce block efficiency metrics #14654

Merged
merged 12 commits into from
Jan 23, 2024
35 changes: 19 additions & 16 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{
CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
MonitoredStorageMetrics, ObjectStoreMetrics,
global_hummock_state_store_metrics, CompactorMetrics, HummockMetrics, HummockStateStoreMetrics,
MonitoredStateStore, MonitoredStorageMetrics, ObjectStoreMetrics,
};
use risingwave_storage::opts::StorageOpts;
use risingwave_storage::{StateStore, StateStoreImpl};
Expand Down Expand Up @@ -162,17 +162,20 @@ impl HummockServiceOpts {

let opts = self.get_storage_opts();

Ok(Arc::new(SstableStore::new(
Arc::new(object_store),
opts.data_directory,
opts.block_cache_capacity_mb * (1 << 20),
opts.meta_cache_capacity_mb * (1 << 20),
0,
opts.block_cache_capacity_mb * (1 << 20),
opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
)))
Ok(Arc::new(SstableStore::new(SstableStoreConfig {
store: Arc::new(object_store),
path: opts.data_directory,
block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20),
meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20),
high_priority_ratio: 0,
prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
max_prefetch_block_number: opts.max_prefetch_block_number,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
})))
}
}
33 changes: 18 additions & 15 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
Expand All @@ -33,9 +33,9 @@ use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use risingwave_storage::hummock::store::version::HummockVersionReader;
use risingwave_storage::hummock::store::HummockStorageIterator;
use risingwave_storage::hummock::{
get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore,
get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore, SstableStoreConfig,
};
use risingwave_storage::monitor::HummockStateStoreMetrics;
use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics};
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use rw_futures_util::select_all;
Expand Down Expand Up @@ -66,18 +66,21 @@ impl HummockJavaBindingIterator {
)
.await,
);
let sstable_store = Arc::new(SstableStore::new(
object_store,
read_plan.data_dir,
1 << 10,
1 << 10,
0,
1 << 10,
16,
FileCache::none(),
FileCache::none(),
None,
));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: read_plan.data_dir,
block_cache_capacity: 1 << 10,
meta_cache_capacity: 1 << 10,
high_priority_ratio: 0,
prefetch_buffer_capacity: 1 << 10,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
}));
let reader = HummockVersionReader::new(
sstable_store,
Arc::new(HummockStateStoreMetrics::unused()),
Expand Down
28 changes: 16 additions & 12 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand All @@ -41,26 +42,29 @@ use risingwave_storage::hummock::sstable_store::SstableStoreRef;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompactionDeleteRangeIterator, FileCache, SstableBuilder, SstableBuilderOptions,
SstableIterator, SstableStore, SstableWriterOptions, Xor16FilterBuilder,
SstableIterator, SstableStore, SstableStoreConfig, SstableWriterOptions, Xor16FilterBuilder,
};
use risingwave_storage::monitor::{
global_hummock_state_store_metrics, CompactorMetrics, StoreLocalStatistic,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};

pub fn mock_sstable_store() -> SstableStoreRef {
let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused()));
let store = Arc::new(ObjectStoreImpl::InMem(store));
let path = "test".to_string();
Arc::new(SstableStore::new(
Arc::new(SstableStore::new(SstableStoreConfig {
store,
path,
64 << 20,
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
))
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}))
}

pub fn default_writer_opts() -> SstableWriterOptions {
Expand Down
31 changes: 16 additions & 15 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ use criterion::{criterion_group, criterion_main, Criterion};
use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
BatchSstableWriterFactory, CachePolicy, FileCache, HummockResult, MemoryLimiter,
SstableBuilder, SstableBuilderOptions, SstableStore, SstableWriterFactory,
SstableBuilder, SstableBuilderOptions, SstableStore, SstableStoreConfig, SstableWriterFactory,
SstableWriterOptions, StreamingSstableWriterFactory, Xor16FilterBuilder,
};
use risingwave_storage::monitor::ObjectStoreMetrics;
use risingwave_storage::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};

const RANGE: Range<u64> = 0..1500000;
const VALUE: &[u8] = &[0; 400];
Expand Down Expand Up @@ -141,18 +141,19 @@ fn bench_builder(
.monitored(metrics)
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));
let sstable_store = Arc::new(SstableStore::new(
object_store,
"test".to_string(),
64 << 20,
128 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: "test".to_string(),
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}));

let mut group = c.benchmark_group("bench_multi_builder");
group
Expand Down
37 changes: 18 additions & 19 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use risingwave_object_store::object::build_remote_object_store;
use risingwave_storage::filter_key_extractor::{
FakeRemoteTableAccessor, RpcFilterKeyExtractorManager,
};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore};
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
use risingwave_storage::opts::StorageOpts;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -97,33 +97,32 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
&storage_memory_config,
)));

let state_store_stats = Arc::new(HummockStateStoreMetrics::unused());
let object_store_stats = Arc::new(ObjectStoreMetrics::unused());
let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());

let compactor_metrics = Arc::new(CompactorMetrics::unused());

let object_store = build_remote_object_store(
&args.object_storage,
object_store_stats,
object_store_metrics,
"Hummock",
ObjectStoreConfig::default(),
)
.await;

let sstable_store = {
Arc::new(SstableStore::new(
Arc::new(object_store),
storage_opts.data_directory.to_string(),
storage_opts.block_cache_capacity_mb * (1 << 20),
storage_opts.meta_cache_capacity_mb * (1 << 20),
storage_opts.high_priority_ratio,
storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
storage_opts.max_prefetch_block_number,
FileCache::none(),
FileCache::none(),
None,
))
};
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
store: Arc::new(object_store),
path: storage_opts.data_directory.to_string(),
block_cache_capacity: storage_opts.block_cache_capacity_mb * (1 << 20),
meta_cache_capacity: storage_opts.meta_cache_capacity_mb * (1 << 20),
high_priority_ratio: storage_opts.high_priority_ratio,
prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
max_prefetch_block_number: storage_opts.max_prefetch_block_number,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
}));

let (hummock_meta_client, notification_client, notifier) = {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
Expand Down Expand Up @@ -158,7 +157,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
hummock_meta_client.clone(),
notification_client,
key_filter_manager,
state_store_stats,
state_store_metrics,
compactor_metrics,
)
.await
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/hummock/file_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl Value for CachedBlock {

fn serialized_len(&self) -> usize {
1 /* type */ + match self {
CachedBlock::Loaded { block } => block.raw_data().len(),
CachedBlock::Loaded { block } => block.raw().len(),
CachedBlock::Fetched { bytes, uncompressed_capacity: _ } => 8 + bytes.len(),
}
}
Expand Down Expand Up @@ -506,7 +506,7 @@ impl std::io::Read for CachedBlockCursor {
if self.pos < 1 {
self.pos += copy([0], &mut buf);
}
self.pos += copy(&block.raw_data()[self.pos - 1..], &mut buf);
self.pos += copy(&block.raw()[self.pos - 1..], &mut buf);
}
CachedBlock::Fetched {
bytes,
Expand Down Expand Up @@ -541,7 +541,7 @@ impl Value for Box<Block> {
type Cursor = BoxBlockCursor;

fn serialized_len(&self) -> usize {
self.raw_data().len()
self.raw().len()
}

fn read(buf: &[u8]) -> CodingResult<Self> {
Expand Down Expand Up @@ -571,7 +571,7 @@ impl BoxBlockCursor {
impl std::io::Read for BoxBlockCursor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let pos = self.pos;
self.pos += copy(&self.inner.raw_data()[self.pos..], buf);
self.pos += copy(&self.inner.raw()[self.pos..], buf);
let n = self.pos - pos;
Ok(n)
}
Expand Down Expand Up @@ -748,7 +748,7 @@ mod tests {
std::io::copy(&mut cursor, &mut buf).unwrap();
let target = cursor.into_inner();
let block = Box::<Block>::read(&buf[..]).unwrap();
assert_eq!(target.raw_data(), block.raw_data());
assert_eq!(target.raw(), block.raw());
}

{
Expand Down Expand Up @@ -779,7 +779,7 @@ mod tests {
CachedBlock::Loaded { block } => block,
CachedBlock::Fetched { .. } => panic!(),
};
assert_eq!(target.raw_data(), block.raw_data());
assert_eq!(target.raw(), block.raw());
}

{
Expand Down
26 changes: 14 additions & 12 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
Expand All @@ -33,9 +34,9 @@ use crate::hummock::test_utils::{
};
use crate::hummock::{
DeleteRangeTombstone, FileCache, HummockValue, SstableBuilderOptions, SstableIterator,
SstableIteratorType, SstableStoreRef, TableHolder,
SstableIteratorType, SstableStoreConfig, SstableStoreRef, TableHolder,
};
use crate::monitor::ObjectStoreMetrics;
use crate::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};

/// `assert_eq` two `Vec<u8>` with human-readable format.
#[macro_export]
Expand All @@ -59,18 +60,19 @@ pub fn mock_sstable_store() -> SstableStoreRef {

pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
let path = "test".to_string();
Arc::new(SstableStore::new(
Arc::new(SstableStore::new(SstableStoreConfig {
store,
path,
64 << 20,
64 << 20,
0,
64 << 20,
16,
FileCache::none(),
FileCache::none(),
None,
))
block_cache_capacity: 64 << 20,
meta_cache_capacity: 64 << 20,
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
}))
}

pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
Expand Down
Loading
Loading