From cdc90211608d83ab3482e610c5e5ef6d919cf099 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 5 Jan 2024 15:53:52 +0900 Subject: [PATCH] feat(metric): implement `role` and `region_disk_usage` (#3095) * feat(metric): implement `role` and `region_disk_usage` * Update src/datanode/src/region_server.rs * Update src/datanode/src/heartbeat.rs --------- Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> --- src/datanode/src/heartbeat.rs | 2 +- src/datanode/src/region_server.rs | 5 ++- src/metric-engine/src/engine.rs | 52 ++++++++++++++++++++++++-- src/metric-engine/src/engine/alter.rs | 11 ++---- src/metric-engine/src/engine/close.rs | 6 +-- src/metric-engine/src/engine/create.rs | 8 ++-- src/metric-engine/src/engine/open.rs | 2 +- src/metric-engine/src/engine/put.rs | 4 +- src/metric-engine/src/engine/read.rs | 20 ++++++---- 9 files changed, 78 insertions(+), 32 deletions(-) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 8845d0d5e850..28253f00e59c 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -305,7 +305,7 @@ impl HeartbeatTask { } async fn load_region_stats(region_server: &RegionServer) -> Vec { - let regions = region_server.opened_regions(); + let regions = region_server.reportable_regions(); let mut region_stats = Vec::new(); for stat in regions { diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 83f18891eb21..04e86df6fe3a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -126,7 +126,10 @@ impl RegionServer { self.inner.handle_read(request).await } - pub fn opened_regions(&self) -> Vec { + /// Returns all opened and reportable regions. + /// + /// Notes: except all metrics regions. + pub fn reportable_regions(&self) -> Vec { self.inner .region_map .iter() diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 83d9444dd950..40040dc49aa6 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -22,7 +22,7 @@ mod region_metadata; mod state; use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; @@ -35,7 +35,6 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; -use tokio::sync::RwLock; use self::state::MetricEngineState; use crate::data_region::DataRegion; @@ -159,8 +158,14 @@ impl RegionEngine for MetricEngine { } /// Retrieves region's disk usage. + /// + /// Note: Returns `None` if it's a logical region. async fn region_disk_usage(&self, region_id: RegionId) -> Option { - todo!() + if self.inner.is_physical_region(region_id) { + self.inner.mito.region_disk_usage(region_id).await + } else { + None + } } /// Stops the engine @@ -192,8 +197,15 @@ impl RegionEngine for MetricEngine { self.inner.mito.set_readonly_gracefully(region_id).await } + /// Returns the physical region role. + /// + /// Note: Returns `None` if it's a logical region. fn role(&self, region_id: RegionId) -> Option { - todo!() + if self.inner.is_physical_region(region_id) { + self.inner.mito.role(region_id) + } else { + None + } } fn as_any(&self) -> &dyn Any { @@ -296,4 +308,36 @@ mod test { .await .unwrap(); } + + #[tokio::test] + async fn test_role() { + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + let physical_region_id = env.default_physical_region_id(); + + assert!(env.metric().role(logical_region_id).is_none()); + assert!(env.metric().role(physical_region_id).is_some()); + } + + #[tokio::test] + async fn test_region_disk_usage() { + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + let physical_region_id = env.default_physical_region_id(); + + assert!(env + .metric() + .region_disk_usage(logical_region_id) + .await + .is_none()); + assert!(env + .metric() + .region_disk_usage(physical_region_id) + .await + .is_some()); + } } diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index aa8e37d8a65c..60520407ec21 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -29,14 +29,9 @@ impl MetricEngineInner { region_id: RegionId, request: RegionAlterRequest, ) -> Result { - let is_altering_logical_region = self - .state - .read() - .await - .physical_regions() - .contains_key(®ion_id); + let is_altering_physical_region = self.is_physical_region(region_id); - let result = if is_altering_logical_region { + let result = if is_altering_physical_region { self.alter_physical_region(region_id, request).await } else { self.alter_logical_region(region_id, request).await @@ -51,7 +46,7 @@ impl MetricEngineInner { request: RegionAlterRequest, ) -> Result<()> { let physical_region_id = { - let state = &self.state.read().await; + let state = &self.state.read().unwrap(); state.get_physical_region_id(region_id).with_context(|| { error!("Trying to alter an nonexistent region {region_id}"); LogicalRegionNotFoundSnafu { region_id } diff --git a/src/metric-engine/src/engine/close.rs b/src/metric-engine/src/engine/close.rs index 6510c04a7ddc..c708453edb9e 100644 --- a/src/metric-engine/src/engine/close.rs +++ b/src/metric-engine/src/engine/close.rs @@ -44,21 +44,21 @@ impl MetricEngineInner { if self .state .read() - .await + .unwrap() .physical_regions() .contains_key(&data_region_id) { self.close_physical_region(data_region_id).await?; self.state .write() - .await + .unwrap() .remove_physical_region(data_region_id)?; Ok(0) } else if self .state .read() - .await + .unwrap() .logical_regions() .contains_key(®ion_id) { diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index f9488ac70e2a..bfd87b92e7eb 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -108,7 +108,7 @@ impl MetricEngineInner { // remember this table self.state .write() - .await + .unwrap() .add_physical_region(data_region_id, physical_column_set); Ok(()) @@ -155,7 +155,7 @@ impl MetricEngineInner { // find new columns to add let mut new_columns = vec![]; { - let state = &self.state.read().await; + let state = &self.state.read().unwrap(); let physical_columns = state .physical_columns() @@ -193,7 +193,7 @@ impl MetricEngineInner { // Safety: previous steps ensure the physical region exist self.state .write() - .await + .unwrap() .add_logical_region(physical_region_id, logical_region_id); info!("Created new logical region {logical_region_id} on physical region {data_region_id}"); LOGICAL_REGION_COUNT.inc(); @@ -221,7 +221,7 @@ impl MetricEngineInner { } // safety: previous step has checked this - self.state.write().await.add_physical_columns( + self.state.write().unwrap().add_physical_columns( data_region_id, new_columns .iter() diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 46c70f74d125..fda8807a7b9c 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -130,7 +130,7 @@ impl MetricEngineInner { .await?; let logical_region_num = logical_regions.len(); - let mut state = self.state.write().await; + let mut state = self.state.write().unwrap(); // recover physical column names let physical_column_names = physical_columns .into_iter() diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 19f40a509975..10ffd39fee4a 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -44,7 +44,7 @@ impl MetricEngineInner { let is_putting_physical_region = self .state .read() - .await + .unwrap() .physical_regions() .contains_key(®ion_id); @@ -68,7 +68,7 @@ impl MetricEngineInner { let physical_region_id = *self .state .read() - .await + .unwrap() .logical_regions() .get(&logical_region_id) .with_context(|| LogicalRegionNotFoundSnafu { diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 365214c56a3d..3d8a9952612f 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -39,12 +39,7 @@ impl MetricEngineInner { region_id: RegionId, request: ScanRequest, ) -> Result { - let is_reading_physical_region = self - .state - .read() - .await - .physical_regions() - .contains_key(®ion_id); + let is_reading_physical_region = self.is_physical_region(region_id); if is_reading_physical_region { info!( @@ -88,7 +83,7 @@ impl MetricEngineInner { let is_reading_physical_region = self .state .read() - .await + .unwrap() .physical_regions() .contains_key(®ion_id); @@ -104,8 +99,17 @@ impl MetricEngineInner { } } + /// Returns true if it's a physical region. + pub fn is_physical_region(&self, region_id: RegionId) -> bool { + self.state + .read() + .unwrap() + .physical_regions() + .contains_key(®ion_id) + } + async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result { - let state = &self.state.read().await; + let state = &self.state.read().unwrap(); state .get_physical_region_id(logical_region_id) .with_context(|| {