From 16b5363052449882661e97466ff8797f2e40f16f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 29 May 2024 15:59:10 +0000 Subject: [PATCH 1/3] fix: avoid acquiring lock during reading stats --- src/datanode/src/region_server.rs | 2 +- src/datanode/src/tests.rs | 2 +- src/file-engine/src/engine.rs | 2 +- src/metric-engine/src/engine.rs | 16 +++------- src/mito2/src/engine.rs | 7 ++-- src/mito2/src/engine/basic_test.rs | 8 ++--- src/mito2/src/manifest/manager.rs | 14 ++++++-- src/mito2/src/manifest/storage.rs | 39 ++++++++++++++++++++--- src/mito2/src/region.rs | 26 +++++++++------ src/mito2/src/region/opener.rs | 20 +++++++++--- src/mito2/src/test_util.rs | 4 +-- src/mito2/src/test_util/scheduler_util.rs | 1 + src/query/src/optimizer/test_util.rs | 2 +- src/store-api/src/region_engine.rs | 2 +- 14 files changed, 98 insertions(+), 47 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 469ed0a6ccf1..5a8236a6d15c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -189,7 +189,7 @@ impl RegionServer { pub async fn region_disk_usage(&self, region_id: RegionId) -> Option { match self.inner.region_map.get(®ion_id) { - Some(e) => e.region_disk_usage(region_id).await, + Some(e) => e.region_disk_usage(region_id), None => None, } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 327e1be46256..b115b366c4af 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -200,7 +200,7 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } - async fn region_disk_usage(&self, _region_id: RegionId) -> Option { + fn region_disk_usage(&self, _region_id: RegionId) -> Option { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index e0a3a6ebdc42..f71622178dfe 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -107,7 +107,7 @@ impl RegionEngine for FileRegionEngine { self.inner.stop().await.map_err(BoxedError::new) } - async fn region_disk_usage(&self, _: RegionId) -> Option { + fn region_disk_usage(&self, _: RegionId) -> Option { None } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index e5b4bf2faca3..3d3ab8c77dbf 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -202,9 +202,9 @@ impl RegionEngine for MetricEngine { /// Retrieves region's disk usage. /// /// Note: Returns `None` if it's a logical region. - async fn region_disk_usage(&self, region_id: RegionId) -> Option { + fn region_disk_usage(&self, region_id: RegionId) -> Option { if self.inner.is_physical_region(region_id) { - self.inner.mito.region_disk_usage(region_id).await + self.inner.mito.region_disk_usage(region_id) } else { None } @@ -383,15 +383,7 @@ mod test { let logical_region_id = env.default_logical_region_id(); let physical_region_id = env.default_physical_region_id(); - assert!(env - .metric() - .region_disk_usage(logical_region_id) - .await - .is_none()); - assert!(env - .metric() - .region_disk_usage(physical_region_id) - .await - .is_some()); + assert!(env.metric().region_disk_usage(logical_region_id).is_none()); + assert!(env.metric().region_disk_usage(physical_region_id).is_some()); } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index bd8d70a6acd7..5c04d75b2292 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -110,14 +110,14 @@ impl MitoEngine { } /// Returns the region disk/memory usage information. - pub async fn get_region_usage(&self, region_id: RegionId) -> Result { + pub fn get_region_usage(&self, region_id: RegionId) -> Result { let region = self .inner .workers .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - Ok(region.region_usage().await) + Ok(region.region_usage()) } /// Handle substrait query and return a stream of record batches @@ -368,10 +368,9 @@ impl RegionEngine for MitoEngine { self.inner.stop().await.map_err(BoxedError::new) } - async fn region_disk_usage(&self, region_id: RegionId) -> Option { + fn region_disk_usage(&self, region_id: RegionId) -> Option { let size = self .get_region_usage(region_id) - .await .map(|usage| usage.disk_usage()) .ok()?; size.try_into().ok() diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 6d3fac897eda..9a5cca209b7a 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -524,7 +524,7 @@ async fn test_region_usage() { .unwrap(); // region is empty now, check manifest size let region = engine.get_region(region_id).unwrap(); - let region_stat = region.region_usage().await; + let region_stat = region.region_usage(); assert_eq!(region_stat.manifest_usage, 686); // put some rows @@ -535,7 +535,7 @@ async fn test_region_usage() { put_rows(&engine, region_id, rows).await; - let region_stat = region.region_usage().await; + let region_stat = region.region_usage(); assert!(region_stat.wal_usage > 0); // delete some rows @@ -545,13 +545,13 @@ async fn test_region_usage() { }; delete_rows(&engine, region_id, rows).await; - let region_stat = region.region_usage().await; + let region_stat = region.region_usage(); assert!(region_stat.wal_usage > 0); // flush region flush_region(&engine, region_id, None).await; - let region_stat = region.region_usage().await; + let region_stat = region.region_usage(); assert_eq!(region_stat.sst_usage, 3010); // region total usage diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b121db9c48e2..777f9a47e49a 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicU64; use std::sync::Arc; use common_datasource::compression::CompressionType; @@ -121,12 +122,17 @@ pub struct RegionManifestManager { impl RegionManifestManager { /// Constructs a region's manifest and persist it. - pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { + pub async fn new( + metadata: RegionMetadataRef, + options: RegionManifestOptions, + total_manifest_size: Arc, + ) -> Result { // construct storage let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, + total_manifest_size, ); info!( @@ -168,7 +174,10 @@ impl RegionManifestManager { /// Opens an existing manifest. /// /// Returns `Ok(None)` if no such manifest. - pub async fn open(options: RegionManifestOptions) -> Result> { + pub async fn open( + options: RegionManifestOptions, + total_manifest_size: Arc, + ) -> Result> { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["open"]) .start_timer(); @@ -178,6 +187,7 @@ impl RegionManifestManager { &options.manifest_dir, options.object_store.clone(), options.compress_type, + total_manifest_size, ); // recover from storage diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 815afd9f4c6c..88450a21bdbb 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -15,6 +15,8 @@ use std::collections::HashMap; use std::iter::Iterator; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use common_datasource::compression::CompressionType; use common_telemetry::debug; @@ -133,15 +135,22 @@ pub struct ManifestObjectStore { path: String, /// Stores the size of each manifest file. manifest_size_map: HashMap, + total_manifest_size: Arc, } impl ManifestObjectStore { - pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self { + pub fn new( + path: &str, + object_store: ObjectStore, + compress_type: CompressionType, + total_manifest_size: Arc, + ) -> Self { Self { object_store, compress_type, path: util::normalize_dir(path), manifest_size_map: HashMap::new(), + total_manifest_size, } } @@ -338,10 +347,9 @@ impl ManifestObjectStore { // delete manifest sizes for (_, is_checkpoint, version) in &del_entries { if *is_checkpoint { - self.manifest_size_map - .remove(&FileKey::Checkpoint(*version)); + self.unset_file_size(&FileKey::Checkpoint(*version)); } else { - self.manifest_size_map.remove(&FileKey::Delta(*version)); + self.unset_file_size(&FileKey::Delta(*version)); } } @@ -564,12 +572,28 @@ impl ManifestObjectStore { /// Set the size of the delta file by delta version. pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map.insert(FileKey::Delta(version), size); + self.inc_total_manifest_size(size); } /// Set the size of the checkpoint file by checkpoint version. pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map .insert(FileKey::Checkpoint(version), size); + self.inc_total_manifest_size(size); + } + + fn unset_file_size(&mut self, key: &FileKey) { + if let Some(val) = self.manifest_size_map.remove(key) { + self.dec_total_manifest_size(val); + } + } + + fn inc_total_manifest_size(&self, val: u64) { + self.total_manifest_size.fetch_add(val, Ordering::Relaxed); + } + + fn dec_total_manifest_size(&self, val: u64) { + self.total_manifest_size.fetch_sub(val, Ordering::Relaxed); } } @@ -610,7 +634,12 @@ mod tests { let mut builder = Fs::default(); let _ = builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed) + ManifestObjectStore::new( + "/", + object_store, + CompressionType::Uncompressed, + Default::default(), + ) } fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 8d776bd36b9f..1f484b08063a 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -20,7 +20,7 @@ pub(crate) mod version; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; use common_telemetry::{error, info, warn}; @@ -106,6 +106,8 @@ pub(crate) struct MitoRegion { time_provider: TimeProviderRef, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, + /// Region stats + stats: Stats, } pub(crate) type MitoRegionRef = Arc; @@ -233,7 +235,7 @@ impl MitoRegion { } /// Returns the region usage in bytes. - pub(crate) async fn region_usage(&self) -> RegionUsage { + pub(crate) fn region_usage(&self) -> RegionUsage { let region_id = self.region_id; let version = self.version(); @@ -243,13 +245,7 @@ impl MitoRegion { let sst_usage = version.ssts.sst_usage(); let wal_usage = self.estimated_wal_usage(memtable_usage); - - let manifest_usage = self - .manifest_ctx - .manifest_manager - .read() - .await - .manifest_usage(); + let manifest_usage = self.stats.total_manifest_size(); RegionUsage { region_id, @@ -526,6 +522,18 @@ impl OpeningRegions { pub(crate) type OpeningRegionsRef = Arc; +/// Single region stats. +#[derive(Default, Debug, Clone)] +pub(crate) struct Stats { + total_manifest_size: Arc, +} + +impl Stats { + fn total_manifest_size(&self) -> u64 { + self.total_manifest_size.load(Ordering::Relaxed) + } +} + #[cfg(test)] mod tests { use crossbeam_utils::atomic::AtomicCell; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 8d05063cc3be..795737337996 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -41,7 +41,7 @@ use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; -use crate::region::{ManifestContext, MitoRegion, RegionState}; +use crate::region::{ManifestContext, MitoRegion, RegionState, Stats}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; @@ -63,6 +63,7 @@ pub(crate) struct RegionOpener { skip_wal_replay: bool, intermediate_manager: IntermediateManager, time_provider: Option, + stats: Stats, } impl RegionOpener { @@ -87,6 +88,7 @@ impl RegionOpener { skip_wal_replay: false, intermediate_manager, time_provider: None, + stats: Default::default(), } } @@ -169,8 +171,12 @@ impl RegionOpener { // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = self.manifest_options(config, &options)?; let metadata = Arc::new(self.metadata.unwrap()); - let manifest_manager = - RegionManifestManager::new(metadata.clone(), region_manifest_options).await?; + let manifest_manager = RegionManifestManager::new( + metadata.clone(), + region_manifest_options, + self.stats.total_manifest_size.clone(), + ) + .await?; let memtable_builder = self .memtable_builder_provider @@ -217,6 +223,7 @@ impl RegionOpener { last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, + stats: self.stats, }) } @@ -267,7 +274,11 @@ impl RegionOpener { let region_options = self.options.as_ref().unwrap().clone(); let region_manifest_options = self.manifest_options(config, ®ion_options)?; - let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? + let Some(manifest_manager) = RegionManifestManager::open( + region_manifest_options, + self.stats.total_manifest_size.clone(), + ) + .await? else { return Ok(None); }; @@ -350,6 +361,7 @@ impl RegionOpener { last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, + stats: self.stats.clone(), }; Ok(Some(region)) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 78dbd1c3362b..4c80782f4d59 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -356,11 +356,11 @@ impl TestEnv { }; if let Some(metadata) = initial_metadata { - RegionManifestManager::new(metadata, manifest_opts) + RegionManifestManager::new(metadata, manifest_opts, Default::default()) .await .map(Some) } else { - RegionManifestManager::open(manifest_opts).await + RegionManifestManager::open(manifest_opts, Default::default()).await } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index bfaf569123ed..a47d9d4e7c63 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -109,6 +109,7 @@ impl SchedulerEnv { compress_type: CompressionType::Uncompressed, checkpoint_distance: 10, }, + Default::default(), ) .await .unwrap(), diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 4ffc3e28e08f..773270351fdf 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -79,7 +79,7 @@ impl RegionEngine for MetaRegionEngine { }) } - async fn region_disk_usage(&self, _region_id: RegionId) -> Option { + fn region_disk_usage(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 91813c91295b..3f9d58a95588 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -200,7 +200,7 @@ pub trait RegionEngine: Send + Sync { async fn get_metadata(&self, region_id: RegionId) -> Result; /// Retrieves region's disk usage. - async fn region_disk_usage(&self, region_id: RegionId) -> Option; + fn region_disk_usage(&self, region_id: RegionId) -> Option; /// Stops the engine async fn stop(&self) -> Result<(), BoxedError>; From 7f0483704973961bd0a535c5291bfcbe6c73e259 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 30 May 2024 06:40:44 +0000 Subject: [PATCH 2/3] chore: apply suggestions from CR --- src/mito2/src/region.rs | 8 ++++---- src/mito2/src/region/opener.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 1f484b08063a..ca96ea83b20f 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -107,7 +107,7 @@ pub(crate) struct MitoRegion { /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, /// Region stats - stats: Stats, + stats: ManifestStats, } pub(crate) type MitoRegionRef = Arc; @@ -522,13 +522,13 @@ impl OpeningRegions { pub(crate) type OpeningRegionsRef = Arc; -/// Single region stats. +/// Manifest stats. #[derive(Default, Debug, Clone)] -pub(crate) struct Stats { +pub(crate) struct ManifestStats { total_manifest_size: Arc, } -impl Stats { +impl ManifestStats { fn total_manifest_size(&self) -> u64 { self.total_manifest_size.load(Ordering::Relaxed) } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 795737337996..ed9cbf037b30 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -41,7 +41,7 @@ use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; -use crate::region::{ManifestContext, MitoRegion, RegionState, Stats}; +use crate::region::{ManifestContext, ManifestStats, MitoRegion, RegionState}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; @@ -63,7 +63,7 @@ pub(crate) struct RegionOpener { skip_wal_replay: bool, intermediate_manager: IntermediateManager, time_provider: Option, - stats: Stats, + stats: ManifestStats, } impl RegionOpener { From ee1b388854b4270c8c29e98791a74ae2537e4e1b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 30 May 2024 06:44:21 +0000 Subject: [PATCH 3/3] chore: apply suggestions from CR --- src/mito2/src/region.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index ca96ea83b20f..fc000f3e8197 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -106,7 +106,7 @@ pub(crate) struct MitoRegion { time_provider: TimeProviderRef, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, - /// Region stats + /// manifest stats stats: ManifestStats, }