Skip to content

Commit

Permalink
feat(metric): implement role and region_disk_usage (#3095)
Browse files Browse the repository at this point in the history
* feat(metric): implement `role` and `region_disk_usage`

* Update src/datanode/src/region_server.rs

* Update src/datanode/src/heartbeat.rs

---------

Co-authored-by: LFC <[email protected]>
  • Loading branch information
WenyXu and MichaelScofield authored Jan 5, 2024
1 parent 702ea32 commit cdc9021
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl HeartbeatTask {
}

async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
let regions = region_server.opened_regions();
let regions = region_server.reportable_regions();

let mut region_stats = Vec::new();
for stat in regions {
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ impl RegionServer {
self.inner.handle_read(request).await
}

pub fn opened_regions(&self) -> Vec<RegionStat> {
/// Returns all opened and reportable regions.
///
/// Notes: except all metrics regions.
pub fn reportable_regions(&self) -> Vec<RegionStat> {
self.inner
.region_map
.iter()
Expand Down
52 changes: 48 additions & 4 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod region_metadata;
mod state;

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
Expand All @@ -35,7 +35,6 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

use self::state::MetricEngineState;
use crate::data_region::DataRegion;
Expand Down Expand Up @@ -159,8 +158,14 @@ impl RegionEngine for MetricEngine {
}

/// Retrieves region's disk usage.
///
/// Note: Returns `None` if it's a logical region.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
todo!()
if self.inner.is_physical_region(region_id) {
self.inner.mito.region_disk_usage(region_id).await
} else {
None
}
}

/// Stops the engine
Expand Down Expand Up @@ -192,8 +197,15 @@ impl RegionEngine for MetricEngine {
self.inner.mito.set_readonly_gracefully(region_id).await
}

/// Returns the physical region role.
///
/// Note: Returns `None` if it's a logical region.
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
if self.inner.is_physical_region(region_id) {
self.inner.mito.role(region_id)
} else {
None
}
}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -296,4 +308,36 @@ mod test {
.await
.unwrap();
}

#[tokio::test]
async fn test_role() {
let env = TestEnv::new().await;
env.init_metric_region().await;

let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env.metric().role(logical_region_id).is_none());
assert!(env.metric().role(physical_region_id).is_some());
}

#[tokio::test]
async fn test_region_disk_usage() {
let env = TestEnv::new().await;
env.init_metric_region().await;

let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env
.metric()
.region_disk_usage(logical_region_id)
.await
.is_none());
assert!(env
.metric()
.region_disk_usage(physical_region_id)
.await
.is_some());
}
}
11 changes: 3 additions & 8 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@ impl MetricEngineInner {
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<AffectedRows> {
let is_altering_logical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
let is_altering_physical_region = self.is_physical_region(region_id);

let result = if is_altering_logical_region {
let result = if is_altering_physical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
Expand All @@ -51,7 +46,7 @@ impl MetricEngineInner {
request: RegionAlterRequest,
) -> Result<()> {
let physical_region_id = {
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
Expand Down
6 changes: 3 additions & 3 deletions src/metric-engine/src/engine/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ impl MetricEngineInner {
if self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&data_region_id)
{
self.close_physical_region(data_region_id).await?;
self.state
.write()
.await
.unwrap()
.remove_physical_region(data_region_id)?;

Ok(0)
} else if self
.state
.read()
.await
.unwrap()
.logical_regions()
.contains_key(&region_id)
{
Expand Down
8 changes: 4 additions & 4 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl MetricEngineInner {
// remember this table
self.state
.write()
.await
.unwrap()
.add_physical_region(data_region_id, physical_column_set);

Ok(())
Expand Down Expand Up @@ -155,7 +155,7 @@ impl MetricEngineInner {
// find new columns to add
let mut new_columns = vec![];
{
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
let physical_columns =
state
.physical_columns()
Expand Down Expand Up @@ -193,7 +193,7 @@ impl MetricEngineInner {
// Safety: previous steps ensure the physical region exist
self.state
.write()
.await
.unwrap()
.add_logical_region(physical_region_id, logical_region_id);
info!("Created new logical region {logical_region_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Expand Down Expand Up @@ -221,7 +221,7 @@ impl MetricEngineInner {
}

// safety: previous step has checked this
self.state.write().await.add_physical_columns(
self.state.write().unwrap().add_physical_columns(
data_region_id,
new_columns
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl MetricEngineInner {
.await?;
let logical_region_num = logical_regions.len();

let mut state = self.state.write().await;
let mut state = self.state.write().unwrap();
// recover physical column names
let physical_column_names = physical_columns
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MetricEngineInner {
let is_putting_physical_region = self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&region_id);

Expand All @@ -68,7 +68,7 @@ impl MetricEngineInner {
let physical_region_id = *self
.state
.read()
.await
.unwrap()
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
Expand Down
20 changes: 12 additions & 8 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ impl MetricEngineInner {
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
let is_reading_physical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
let is_reading_physical_region = self.is_physical_region(region_id);

if is_reading_physical_region {
info!(
Expand Down Expand Up @@ -88,7 +83,7 @@ impl MetricEngineInner {
let is_reading_physical_region = self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&region_id);

Expand All @@ -104,8 +99,17 @@ impl MetricEngineInner {
}
}

/// Returns true if it's a physical region.
pub fn is_physical_region(&self, region_id: RegionId) -> bool {
self.state
.read()
.unwrap()
.physical_regions()
.contains_key(&region_id)
}

async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
Expand Down

0 comments on commit cdc9021

Please sign in to comment.