From 2e16c264863f9cdc95802e462f728abfe5499cfe Mon Sep 17 00:00:00 2001 From: Croxx Date: Tue, 23 Jan 2024 17:40:42 +0800 Subject: [PATCH] feat(stats): introduce block efficiency metrics (#14654) Signed-off-by: MrCroxx --- src/ctl/src/common/hummock_service.rs | 35 +-- src/jni_core/src/hummock_iterator.rs | 33 +-- src/storage/benches/bench_compactor.rs | 28 ++- src/storage/benches/bench_multi_builder.rs | 31 +-- .../hummock_test/src/bin/replay/main.rs | 37 ++- src/storage/src/hummock/file_cache/store.rs | 12 +- .../src/hummock/iterator/test_utils.rs | 26 +- src/storage/src/hummock/sstable/block.rs | 31 ++- .../src/hummock/sstable/block_iterator.rs | 21 +- src/storage/src/hummock/sstable_store.rs | 63 +++-- src/storage/src/lib.rs | 2 + src/storage/src/monitor/hitmap.rs | 238 ++++++++++++++++++ .../monitor/hummock_state_store_metrics.rs | 18 ++ src/storage/src/monitor/mod.rs | 3 + src/storage/src/store_impl.rs | 21 +- .../src/delete_range_runner.rs | 26 +- 16 files changed, 475 insertions(+), 150 deletions(-) create mode 100644 src/storage/src/monitor/hitmap.rs diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 7eb8af52b51ae..b88dd265d382b 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_common::config::ObjectStoreConfig; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_object_store::object::build_remote_object_store; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; -use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore}; +use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig}; use risingwave_storage::monitor::{ - CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore, - MonitoredStorageMetrics, ObjectStoreMetrics, + global_hummock_state_store_metrics, CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, + MonitoredStateStore, MonitoredStorageMetrics, ObjectStoreMetrics, }; use risingwave_storage::opts::StorageOpts; use risingwave_storage::{StateStore, StateStoreImpl}; @@ -162,17 +162,20 @@ impl HummockServiceOpts { let opts = self.get_storage_opts(); - Ok(Arc::new(SstableStore::new( - Arc::new(object_store), - opts.data_directory, - opts.block_cache_capacity_mb * (1 << 20), - opts.meta_cache_capacity_mb * (1 << 20), - 0, - opts.block_cache_capacity_mb * (1 << 20), - opts.max_prefetch_block_number, - FileCache::none(), - FileCache::none(), - None, - ))) + Ok(Arc::new(SstableStore::new(SstableStoreConfig { + store: Arc::new(object_store), + path: opts.data_directory, + block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20), + meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20), + high_priority_ratio: 0, + prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20), + max_prefetch_block_number: opts.max_prefetch_block_number, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: Arc::new(global_hummock_state_store_metrics( + MetricLevel::Disabled, + )), + }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index c66669d559154..ee2084b6ecf81 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use futures::TryStreamExt; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::config::ObjectStoreConfig; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; @@ -33,9 +33,9 @@ use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::HummockVersionReader; use risingwave_storage::hummock::store::HummockStorageIterator; use risingwave_storage::hummock::{ - get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore, + get_committed_read_version_tuple, CachePolicy, FileCache, SstableStore, SstableStoreConfig, }; -use risingwave_storage::monitor::HummockStateStoreMetrics; +use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; use rw_futures_util::select_all; @@ -66,18 +66,21 @@ impl HummockJavaBindingIterator { ) .await, ); - let sstable_store = Arc::new(SstableStore::new( - object_store, - read_plan.data_dir, - 1 << 10, - 1 << 10, - 0, - 1 << 10, - 16, - FileCache::none(), - FileCache::none(), - None, - )); + let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { + store: object_store, + path: read_plan.data_dir, + block_cache_capacity: 1 << 10, + meta_cache_capacity: 1 << 10, + high_priority_ratio: 0, + prefetch_buffer_capacity: 1 << 10, + max_prefetch_block_number: 16, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: Arc::new(global_hummock_state_store_metrics( + MetricLevel::Disabled, + )), + })); let reader = HummockVersionReader::new( sstable_store, Arc::new(HummockStateStoreMetrics::unused()), diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 29d2696d8323b..5f9ee1c8e8524 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -19,6 +19,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; +use risingwave_common::config::MetricLevel; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; @@ -41,26 +42,29 @@ use risingwave_storage::hummock::sstable_store::SstableStoreRef; use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ CachePolicy, CompactionDeleteRangeIterator, FileCache, SstableBuilder, SstableBuilderOptions, - SstableIterator, SstableStore, SstableWriterOptions, Xor16FilterBuilder, + SstableIterator, SstableStore, SstableStoreConfig, SstableWriterOptions, Xor16FilterBuilder, +}; +use risingwave_storage::monitor::{ + global_hummock_state_store_metrics, CompactorMetrics, StoreLocalStatistic, }; -use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic}; pub fn mock_sstable_store() -> SstableStoreRef { let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())); let store = Arc::new(ObjectStoreImpl::InMem(store)); let path = "test".to_string(); - Arc::new(SstableStore::new( + Arc::new(SstableStore::new(SstableStoreConfig { store, path, - 64 << 20, - 128 << 20, - 0, - 64 << 20, - 16, - FileCache::none(), - FileCache::none(), - None, - )) + block_cache_capacity: 64 << 20, + meta_cache_capacity: 128 << 20, + high_priority_ratio: 0, + prefetch_buffer_capacity: 64 << 20, + max_prefetch_block_number: 16, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + })) } pub fn default_writer_opts() -> SstableWriterOptions { diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index a55cb24fff801..7d1abf67ec857 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -23,17 +23,17 @@ use criterion::{criterion_group, criterion_main, Criterion}; use futures::future::try_join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::ObjectStoreConfig; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_hummock_sdk::key::{FullKey, UserKey}; use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore}; use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ BatchSstableWriterFactory, CachePolicy, FileCache, HummockResult, MemoryLimiter, - SstableBuilder, SstableBuilderOptions, SstableStore, SstableWriterFactory, + SstableBuilder, SstableBuilderOptions, SstableStore, SstableStoreConfig, SstableWriterFactory, SstableWriterOptions, StreamingSstableWriterFactory, Xor16FilterBuilder, }; -use risingwave_storage::monitor::ObjectStoreMetrics; +use risingwave_storage::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics}; const RANGE: Range = 0..1500000; const VALUE: &[u8] = &[0; 400]; @@ -141,18 +141,19 @@ fn bench_builder( .monitored(metrics) }); let object_store = Arc::new(ObjectStoreImpl::S3(object_store)); - let sstable_store = Arc::new(SstableStore::new( - object_store, - "test".to_string(), - 64 << 20, - 128 << 20, - 0, - 64 << 20, - 16, - FileCache::none(), - FileCache::none(), - None, - )); + let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { + store: object_store, + path: "test".to_string(), + block_cache_capacity: 64 << 20, + meta_cache_capacity: 128 << 20, + high_priority_ratio: 0, + prefetch_buffer_capacity: 64 << 20, + max_prefetch_block_number: 16, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + })); let mut group = c.benchmark_group("bench_multi_builder"); group diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index d5190532f2d98..276a72bb26592 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -43,7 +43,7 @@ use risingwave_object_store::object::build_remote_object_store; use risingwave_storage::filter_key_extractor::{ FakeRemoteTableAccessor, RpcFilterKeyExtractorManager, }; -use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore}; +use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore, SstableStoreConfig}; use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics}; use risingwave_storage::opts::StorageOpts; use serde::{Deserialize, Serialize}; @@ -97,33 +97,32 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result Result usize { 1 /* type */ + match self { - CachedBlock::Loaded { block } => block.raw_data().len(), + CachedBlock::Loaded { block } => block.raw().len(), CachedBlock::Fetched { bytes, uncompressed_capacity: _ } => 8 + bytes.len(), } } @@ -506,7 +506,7 @@ impl std::io::Read for CachedBlockCursor { if self.pos < 1 { self.pos += copy([0], &mut buf); } - self.pos += copy(&block.raw_data()[self.pos - 1..], &mut buf); + self.pos += copy(&block.raw()[self.pos - 1..], &mut buf); } CachedBlock::Fetched { bytes, @@ -541,7 +541,7 @@ impl Value for Box { type Cursor = BoxBlockCursor; fn serialized_len(&self) -> usize { - self.raw_data().len() + self.raw().len() } fn read(buf: &[u8]) -> CodingResult { @@ -571,7 +571,7 @@ impl BoxBlockCursor { impl std::io::Read for BoxBlockCursor { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let pos = self.pos; - self.pos += copy(&self.inner.raw_data()[self.pos..], buf); + self.pos += copy(&self.inner.raw()[self.pos..], buf); let n = self.pos - pos; Ok(n) } @@ -748,7 +748,7 @@ mod tests { std::io::copy(&mut cursor, &mut buf).unwrap(); let target = cursor.into_inner(); let block = Box::::read(&buf[..]).unwrap(); - assert_eq!(target.raw_data(), block.raw_data()); + assert_eq!(target.raw(), block.raw()); } { @@ -779,7 +779,7 @@ mod tests { CachedBlock::Loaded { block } => block, CachedBlock::Fetched { .. } => panic!(), }; - assert_eq!(target.raw_data(), block.raw_data()); + assert_eq!(target.raw(), block.raw()); } { diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index e24757923628b..e154098e7a0ae 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::MetricLevel; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_object_store::object::{ @@ -33,9 +34,9 @@ use crate::hummock::test_utils::{ }; use crate::hummock::{ DeleteRangeTombstone, FileCache, HummockValue, SstableBuilderOptions, SstableIterator, - SstableIteratorType, SstableStoreRef, TableHolder, + SstableIteratorType, SstableStoreConfig, SstableStoreRef, TableHolder, }; -use crate::monitor::ObjectStoreMetrics; +use crate::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics}; /// `assert_eq` two `Vec` with human-readable format. #[macro_export] @@ -59,18 +60,19 @@ pub fn mock_sstable_store() -> SstableStoreRef { pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef { let path = "test".to_string(); - Arc::new(SstableStore::new( + Arc::new(SstableStore::new(SstableStoreConfig { store, path, - 64 << 20, - 64 << 20, - 0, - 64 << 20, - 16, - FileCache::none(), - FileCache::none(), - None, - )) + block_cache_capacity: 64 << 20, + meta_cache_capacity: 64 << 20, + high_priority_ratio: 0, + prefetch_buffer_capacity: 64 << 20, + max_prefetch_block_number: 16, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)), + })) } pub fn iterator_test_table_key_of(idx: usize) -> Vec { diff --git a/src/storage/src/hummock/sstable/block.rs b/src/storage/src/hummock/sstable/block.rs index d21869f8ba09d..82f21d7c971b1 100644 --- a/src/storage/src/hummock/sstable/block.rs +++ b/src/storage/src/hummock/sstable/block.rs @@ -28,11 +28,14 @@ use super::utils::{bytes_diff_below_max_key_length, xxhash64_verify, Compression use crate::hummock::sstable::utils; use crate::hummock::sstable::utils::xxhash64_checksum; use crate::hummock::{HummockError, HummockResult}; +use crate::monitor::Hitmap; pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024; pub const DEFAULT_RESTART_INTERVAL: usize = 16; pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64) +pub const HITMAP_ELEMS: usize = 4; + #[allow(non_camel_case_types)] #[derive(Clone, Copy, PartialEq, Debug)] pub enum LenType { @@ -139,10 +142,9 @@ impl RestartPoint { } } -#[derive(Clone)] pub struct Block { /// Uncompressed entries data, with restart encoded restart points info. - pub data: Bytes, + data: Bytes, /// Uncompressed entried data length. data_len: usize, @@ -151,6 +153,20 @@ pub struct Block { /// Restart points. restart_points: Vec, + + hitmap: Hitmap, +} + +impl Clone for Block { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + data_len: self.data_len, + table_id: self.table_id, + restart_points: self.restart_points.clone(), + hitmap: Hitmap::default(), + } + } } impl Debug for Block { @@ -272,6 +288,7 @@ impl Block { data_len, restart_points, table_id: TableId::new(table_id), + hitmap: Hitmap::default(), } } @@ -314,9 +331,17 @@ impl Block { &self.data[..self.data_len] } - pub fn raw_data(&self) -> &[u8] { + pub fn raw(&self) -> &[u8] { &self.data[..] } + + pub fn hitmap(&self) -> &Hitmap { + &self.hitmap + } + + pub fn efficiency(&self) -> f64 { + self.hitmap.ratio() + } } /// [`KeyPrefix`] contains info for prefix compression. diff --git a/src/storage/src/hummock/sstable/block_iterator.rs b/src/storage/src/hummock/sstable/block_iterator.rs index b1a46b7595d2a..4c467adc8f589 100644 --- a/src/storage/src/hummock/sstable/block_iterator.rs +++ b/src/storage/src/hummock/sstable/block_iterator.rs @@ -19,8 +19,9 @@ use bytes::BytesMut; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::FullKey; -use super::{KeyPrefix, LenType, RestartPoint}; +use super::{KeyPrefix, LenType, RestartPoint, HITMAP_ELEMS}; use crate::hummock::BlockHolder; +use crate::monitor::LocalHitmap; /// [`BlockIterator`] is used to read kv pairs in a block. pub struct BlockIterator { @@ -39,10 +40,14 @@ pub struct BlockIterator { last_key_len_type: LenType, last_value_len_type: LenType, + + /// NOTE: `hitmap` is supposed be updated every time when `value_range` is updated. + hitmap: LocalHitmap, } impl BlockIterator { pub fn new(block: BlockHolder) -> Self { + let hitmap = block.hitmap().local(); Self { block, offset: usize::MAX, @@ -52,6 +57,7 @@ impl BlockIterator { entry_len: 0, last_key_len_type: LenType::u8, last_value_len_type: LenType::u8, + hitmap, } } @@ -81,7 +87,6 @@ impl BlockIterator { pub fn key(&self) -> FullKey<&[u8]> { assert!(self.is_valid()); - FullKey::from_slice_without_table_id(self.table_id(), &self.key[..]) } @@ -105,13 +110,11 @@ impl BlockIterator { pub fn seek(&mut self, key: FullKey<&[u8]>) { self.seek_restart_point_by_key(key); - self.next_until_key(key); } pub fn seek_le(&mut self, key: FullKey<&[u8]>) { self.seek_restart_point_by_key(key); - self.next_until_key(key); if !self.is_valid() { self.seek_to_last(); @@ -172,6 +175,8 @@ impl BlockIterator { self.offset = offset; self.entry_len = prefix.entry_len(); + self.update_hitmap(); + true } @@ -285,6 +290,8 @@ impl BlockIterator { self.offset = offset; self.entry_len = prefix.entry_len(); self.update_restart_point(index); + + self.update_hitmap(); } fn update_restart_point(&mut self, index: usize) { @@ -294,6 +301,12 @@ impl BlockIterator { self.last_key_len_type = restart_point.key_len_type; self.last_value_len_type = restart_point.value_len_type; } + + /// Update the local hitmap of the block based on the current iterator position. + fn update_hitmap(&mut self) { + self.hitmap + .fill_with_range(self.offset, self.value_range.end, self.block.len()); + } } #[cfg(test)] diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 06bb5ee054866..4b388a695ec0e 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -51,7 +51,7 @@ use crate::hummock::multi_builder::UploadJoinHandle; use crate::hummock::{ BlockHolder, CacheableEntry, HummockError, HummockResult, LruCache, MemoryLimiter, }; -use crate::monitor::{MemoryCollector, StoreLocalStatistic}; +use crate::monitor::{HummockStateStoreMetrics, MemoryCollector, StoreLocalStatistic}; const MAX_META_CACHE_SHARD_BITS: usize = 2; const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. @@ -102,6 +102,7 @@ impl From for TracedCachePolicy { struct BlockCacheEventListener { data_file_cache: FileCache, + metrics: Arc, } impl LruCacheEventListener for BlockCacheEventListener { @@ -113,6 +114,10 @@ impl LruCacheEventListener for BlockCacheEventListener { sst_id: key.0, block_idx: key.1, }; + self.metrics + .block_efficiency_histogram + .with_label_values(&[&value.table_id().to_string()]) + .observe(value.efficiency()); // temporarily avoid spawn task while task drop with madsim // FYI: https://github.com/madsim-rs/madsim/issues/182 #[cfg(not(madsim))] @@ -159,6 +164,20 @@ where } } +pub struct SstableStoreConfig { + pub store: ObjectStoreRef, + pub path: String, + pub block_cache_capacity: usize, + pub meta_cache_capacity: usize, + pub high_priority_ratio: usize, + pub prefetch_buffer_capacity: usize, + pub max_prefetch_block_number: usize, + pub data_file_cache: FileCache, + pub meta_file_cache: FileCache, + pub recent_filter: Option>>, + pub state_store_metrics: Arc, +} + pub struct SstableStore { path: String, store: ObjectStoreRef, @@ -177,51 +196,43 @@ pub struct SstableStore { } impl SstableStore { - pub fn new( - store: ObjectStoreRef, - path: String, - block_cache_capacity: usize, - meta_cache_capacity: usize, - high_priority_ratio: usize, - prefetch_buffer_capacity: usize, - max_prefetch_block_number: usize, - data_file_cache: FileCache, - meta_file_cache: FileCache, - recent_filter: Option>>, - ) -> Self { + pub fn new(config: SstableStoreConfig) -> Self { // TODO: We should validate path early. Otherwise object store won't report invalid path // error until first write attempt. let mut shard_bits = MAX_META_CACHE_SHARD_BITS; - while (meta_cache_capacity >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { + while (config.meta_cache_capacity >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD + && shard_bits > 0 + { shard_bits -= 1; } let block_cache_listener = Arc::new(BlockCacheEventListener { - data_file_cache: data_file_cache.clone(), + data_file_cache: config.data_file_cache.clone(), + metrics: config.state_store_metrics, }); - let meta_cache_listener = Arc::new(MetaCacheEventListener(meta_file_cache.clone())); + let meta_cache_listener = Arc::new(MetaCacheEventListener(config.meta_file_cache.clone())); Self { - path, - store, + path: config.path, + store: config.store, block_cache: BlockCache::with_event_listener( - block_cache_capacity, + config.block_cache_capacity, MAX_CACHE_SHARD_BITS, - high_priority_ratio, + config.high_priority_ratio, block_cache_listener, ), meta_cache: Arc::new(LruCache::with_event_listener( shard_bits, - meta_cache_capacity, + config.meta_cache_capacity, 0, meta_cache_listener, )), - data_file_cache, - meta_file_cache, + data_file_cache: config.data_file_cache, + meta_file_cache: config.meta_file_cache, - recent_filter, + recent_filter: config.recent_filter, prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)), - prefetch_buffer_capacity, - max_prefetch_block_number, + prefetch_buffer_capacity: config.prefetch_buffer_capacity, + max_prefetch_block_number: config.max_prefetch_block_number, } } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 8bf78347d1803..8e6efa63e3545 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -40,6 +40,8 @@ #![feature(associated_type_bounds)] #![feature(exclusive_range_pattern)] #![feature(impl_trait_in_assoc_type)] +#![feature(maybe_uninit_uninit_array)] +#![feature(maybe_uninit_array_assume_init)] pub mod hummock; pub mod memory; diff --git a/src/storage/src/monitor/hitmap.rs b/src/storage/src/monitor/hitmap.rs new file mode 100644 index 0000000000000..6afb79b53b332 --- /dev/null +++ b/src/storage/src/monitor/hitmap.rs @@ -0,0 +1,238 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use risingwave_common::util::iter_util::ZipEqFast; + +#[derive(Debug, Clone)] +pub struct Hitmap { + /// For [`Hitmap`] is rarely access in multi-thread pattern, + /// the cons of false-sharing can be ignored. + data: Arc<[AtomicU64; N]>, +} + +impl Default for Hitmap { + fn default() -> Self { + let data = [(); N].map(|_| AtomicU64::default()); + let data = Arc::new(data); + Self { data } + } +} + +impl Hitmap { + pub const fn bits() -> usize { + N * u64::BITS as usize + } + + pub const fn bytes() -> usize { + N * 8 + } + + pub fn local(&self) -> LocalHitmap { + LocalHitmap::new(self) + } + + pub fn apply(&self, local: &LocalHitmap) { + for (global, local) in self.data.iter().zip_eq_fast(local.data.iter()) { + global.fetch_or(*local, Ordering::Relaxed); + } + } + + pub fn ones(&self) -> usize { + let mut res = 0; + for elem in &*self.data { + res += elem.load(Ordering::Relaxed).count_ones() as usize; + } + res + } + + pub fn zeros(&self) -> usize { + Self::bits() - self.ones() + } + + pub fn ratio(&self) -> f64 { + self.ones() as f64 / Self::bits() as f64 + } + + #[cfg(test)] + pub fn to_hex_vec(&self) -> Vec { + use itertools::Itertools; + self.data + .iter() + .map(|elem| elem.load(Ordering::Relaxed)) + .map(|v| format!("{v:016x}")) + .collect_vec() + } +} + +#[derive(Debug)] +pub struct LocalHitmap { + owner: Hitmap, + data: Box<[u64; N]>, +} + +impl LocalHitmap { + pub const fn bits() -> usize { + N * u64::BITS as usize + } + + pub const fn bytes() -> usize { + N * 8 + } + + pub fn new(owner: &Hitmap) -> Self { + Self { + owner: owner.clone(), + data: Box::new([0; N]), + } + } + + pub fn fill(&mut self, start_bit: usize, end_bit: usize) { + const MASK: usize = (1 << 6) - 1; + + let end_bit = std::cmp::min(end_bit, Self::bits()); + + let head_bits = start_bit & MASK; + let tail_bits_rev = end_bit & MASK; + + let head_elem = start_bit >> 6; + let tail_elem = end_bit >> 6; + + for i in head_elem..=std::cmp::min(tail_elem, N - 1) { + let elem = &mut self.data[i]; + let mut umask = 0u64; + if i == head_elem { + umask |= (1u64 << head_bits) - 1; + } + if i == tail_elem { + umask |= !((1u64 << tail_bits_rev) - 1); + } + *elem |= !umask; + } + } + + pub fn fill_with_range(&mut self, start: usize, end: usize, len: usize) { + let start_bit = Self::bits() * start / len; + let end_bit = Self::bits() * end / len; + self.fill(start_bit, end_bit) + } + + pub fn ones(&self) -> usize { + let mut res = 0; + for elem in &*self.data { + res += elem.count_ones() as usize; + } + res + } + + pub fn zeros(&self) -> usize { + Self::bits() - self.ones() + } + + pub fn ratio(&self) -> f64 { + self.ones() as f64 / Self::bits() as f64 + } + + #[cfg(test)] + pub fn to_hex_vec(&self) -> Vec { + use itertools::Itertools; + self.data.iter().map(|v| format!("{v:016x}")).collect_vec() + } +} + +impl Drop for LocalHitmap { + fn drop(&mut self) { + self.owner.apply(self); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_hitmap() { + // hex: high <== low + let g = Hitmap::<4>::default(); + + let mut h = g.local(); + assert_eq!( + h.to_hex_vec(), + vec![ + "0000000000000000", + "0000000000000000", + "0000000000000000", + "0000000000000000", + ] + ); + assert_eq!(h.ones(), 0); + h.fill(16, 24); + assert_eq!( + h.to_hex_vec(), + vec![ + "0000000000ff0000", + "0000000000000000", + "0000000000000000", + "0000000000000000", + ] + ); + assert_eq!(h.ones(), 8); + h.fill(32, 64); + assert_eq!( + h.to_hex_vec(), + vec![ + "ffffffff00ff0000", + "0000000000000000", + "0000000000000000", + "0000000000000000", + ] + ); + assert_eq!(h.ones(), 40); + h.fill(96, 224); + assert_eq!( + h.to_hex_vec(), + vec![ + "ffffffff00ff0000", + "ffffffff00000000", + "ffffffffffffffff", + "00000000ffffffff", + ] + ); + assert_eq!(h.ones(), 168); + h.fill(0, 256); + assert_eq!( + h.to_hex_vec(), + vec![ + "ffffffffffffffff", + "ffffffffffffffff", + "ffffffffffffffff", + "ffffffffffffffff", + ] + ); + assert_eq!(h.ones(), 256); + drop(h); + assert_eq!( + g.to_hex_vec(), + vec![ + "ffffffffffffffff", + "ffffffffffffffff", + "ffffffffffffffff", + "ffffffffffffffff", + ] + ); + assert_eq!(g.ones(), 256); + } +} diff --git a/src/storage/src/monitor/hummock_state_store_metrics.rs b/src/storage/src/monitor/hummock_state_store_metrics.rs index 6072d2676f492..6954263010333 100644 --- a/src/storage/src/monitor/hummock_state_store_metrics.rs +++ b/src/storage/src/monitor/hummock_state_store_metrics.rs @@ -77,6 +77,9 @@ pub struct HummockStateStoreMetrics { // memory pub mem_table_spill_counts: RelabeledCounterVec, + + // block statistics + pub block_efficiency_histogram: RelabeledHistogramVec, } pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock = OnceLock::new(); @@ -372,6 +375,19 @@ impl HummockStateStoreMetrics { metric_level, ); + let opts = histogram_opts!( + "block_efficiency_histogram", + "Access ratio of in-memory block.", + exponential_buckets(0.001, 2.0, 11).unwrap(), + ); + let block_efficiency_histogram = + register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let block_efficiency_histogram = RelabeledHistogramVec::with_metric_level( + MetricLevel::Info, + block_efficiency_histogram, + metric_level, + ); + Self { bloom_filter_true_negative_counts, bloom_filter_check_counts, @@ -397,6 +413,8 @@ impl HummockStateStoreMetrics { spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]), uploader_uploading_task_size, mem_table_spill_counts, + + block_efficiency_histogram, } } diff --git a/src/storage/src/monitor/mod.rs b/src/storage/src/monitor/mod.rs index 053cc72cf8130..1849f272d8473 100644 --- a/src/storage/src/monitor/mod.rs +++ b/src/storage/src/monitor/mod.rs @@ -28,6 +28,9 @@ pub use compactor_metrics::*; mod local_metrics; pub use local_metrics::*; + +mod hitmap; +pub use hitmap::*; pub use risingwave_object_store::object::object_metrics::{ ObjectStoreMetrics, GLOBAL_OBJECT_STORE_METRICS, }; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 86adbe13b01bd..e4ee411d3ad21 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -28,7 +28,7 @@ use crate::hummock::file_cache::preclude::*; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::{ set_foyer_metrics_registry, FileCache, FileCacheConfig, HummockError, HummockStorage, - RecentFilter, SstableStore, + RecentFilter, SstableStore, SstableStoreConfig, }; use crate::memory::sled::SledStateStore; use crate::memory::MemoryStateStore; @@ -610,18 +610,19 @@ impl StateStoreImpl { ) .await; - let sstable_store = Arc::new(SstableStore::new( - Arc::new(object_store), - opts.data_directory.to_string(), - opts.block_cache_capacity_mb * (1 << 20), - opts.meta_cache_capacity_mb * (1 << 20), - opts.high_priority_ratio, - opts.prefetch_buffer_capacity_mb * (1 << 20), - opts.max_prefetch_block_number, + let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { + store: Arc::new(object_store), + path: opts.data_directory.to_string(), + block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20), + meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20), + high_priority_ratio: opts.high_priority_ratio, + prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20), + max_prefetch_block_number: opts.max_prefetch_block_number, data_file_cache, meta_file_cache, recent_filter, - )); + state_store_metrics: state_store_metrics.clone(), + })); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); let key_filter_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new( diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index ab65973dcf00e..26638dde1d8d6 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -55,6 +55,7 @@ use risingwave_storage::hummock::sstable_store::SstableStoreRef; use risingwave_storage::hummock::utils::cmp_delete_range_left_bounds; use risingwave_storage::hummock::{ CachePolicy, FileCache, HummockStorage, MemoryLimiter, SstableObjectIdManager, SstableStore, + SstableStoreConfig, }; use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics}; use risingwave_storage::opts::StorageOpts; @@ -212,18 +213,19 @@ async fn compaction_test( ObjectStoreConfig::default(), ) .await; - let sstable_store = Arc::new(SstableStore::new( - Arc::new(remote_object_store), - system_params.data_directory().to_string(), - storage_memory_config.block_cache_capacity_mb * (1 << 20), - storage_memory_config.meta_cache_capacity_mb * (1 << 20), - 0, - storage_memory_config.prefetch_buffer_capacity_mb * (1 << 20), - storage_opts.max_prefetch_block_number, - FileCache::none(), - FileCache::none(), - None, - )); + let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { + store: Arc::new(remote_object_store), + path: system_params.data_directory().to_string(), + block_cache_capacity: storage_memory_config.block_cache_capacity_mb * (1 << 20), + meta_cache_capacity: storage_memory_config.meta_cache_capacity_mb * (1 << 20), + high_priority_ratio: 0, + prefetch_buffer_capacity: storage_memory_config.prefetch_buffer_capacity_mb * (1 << 20), + max_prefetch_block_number: storage_opts.max_prefetch_block_number, + data_file_cache: FileCache::none(), + meta_file_cache: FileCache::none(), + recent_filter: None, + state_store_metrics: state_store_metrics.clone(), + })); let store = HummockStorage::new( storage_opts.clone(),