Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric): implement role and region_disk_usage #3095

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl HeartbeatTask {
}

async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
let regions = region_server.opened_regions();
let regions = region_server.reportable_regions();

let mut region_stats = Vec::new();
for stat in regions {
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ impl RegionServer {
self.inner.handle_read(request).await
}

pub fn opened_regions(&self) -> Vec<RegionStat> {
/// Returns all opened and reportable regions.
///
/// Notes: except all metrics regions.
pub fn reportable_regions(&self) -> Vec<RegionStat> {
self.inner
.region_map
.iter()
Expand Down
52 changes: 48 additions & 4 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod region_metadata;
mod state;

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
Expand All @@ -35,7 +35,6 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

use self::state::MetricEngineState;
use crate::data_region::DataRegion;
Expand Down Expand Up @@ -159,8 +158,14 @@ impl RegionEngine for MetricEngine {
}

/// Retrieves region's disk usage.
///
/// Note: Returns `None` if it's a logical region.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
todo!()
if self.inner.is_physical_region(region_id) {
self.inner.mito.region_disk_usage(region_id).await
} else {
None
}
}

/// Stops the engine
Expand Down Expand Up @@ -192,8 +197,15 @@ impl RegionEngine for MetricEngine {
self.inner.mito.set_readonly_gracefully(region_id).await
}

/// Returns the physical region role.
///
/// Note: Returns `None` if it's a logical region.
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
if self.inner.is_physical_region(region_id) {
self.inner.mito.role(region_id)
} else {
None
}
}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -296,4 +308,36 @@ mod test {
.await
.unwrap();
}

#[tokio::test]
async fn test_role() {
let env = TestEnv::new().await;
env.init_metric_region().await;

let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env.metric().role(logical_region_id).is_none());
assert!(env.metric().role(physical_region_id).is_some());
}

#[tokio::test]
async fn test_region_disk_usage() {
let env = TestEnv::new().await;
env.init_metric_region().await;

let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env
.metric()
.region_disk_usage(logical_region_id)
.await
.is_none());
assert!(env
.metric()
.region_disk_usage(physical_region_id)
.await
.is_some());
}
}
11 changes: 3 additions & 8 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@ impl MetricEngineInner {
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<AffectedRows> {
let is_altering_logical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
let is_altering_physical_region = self.is_physical_region(region_id);

let result = if is_altering_logical_region {
let result = if is_altering_physical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
Expand All @@ -51,7 +46,7 @@ impl MetricEngineInner {
request: RegionAlterRequest,
) -> Result<()> {
let physical_region_id = {
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
Expand Down
6 changes: 3 additions & 3 deletions src/metric-engine/src/engine/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ impl MetricEngineInner {
if self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&data_region_id)
{
self.close_physical_region(data_region_id).await?;
self.state
.write()
.await
.unwrap()
.remove_physical_region(data_region_id)?;

Ok(0)
} else if self
.state
.read()
.await
.unwrap()
.logical_regions()
.contains_key(&region_id)
{
Expand Down
8 changes: 4 additions & 4 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl MetricEngineInner {
// remember this table
self.state
.write()
.await
.unwrap()
.add_physical_region(data_region_id, physical_column_set);

Ok(())
Expand Down Expand Up @@ -155,7 +155,7 @@ impl MetricEngineInner {
// find new columns to add
let mut new_columns = vec![];
{
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
let physical_columns =
state
.physical_columns()
Expand Down Expand Up @@ -193,7 +193,7 @@ impl MetricEngineInner {
// Safety: previous steps ensure the physical region exist
self.state
.write()
.await
.unwrap()
.add_logical_region(physical_region_id, logical_region_id);
info!("Created new logical region {logical_region_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Expand Down Expand Up @@ -221,7 +221,7 @@ impl MetricEngineInner {
}

// safety: previous step has checked this
self.state.write().await.add_physical_columns(
self.state.write().unwrap().add_physical_columns(
data_region_id,
new_columns
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl MetricEngineInner {
.await?;
let logical_region_num = logical_regions.len();

let mut state = self.state.write().await;
let mut state = self.state.write().unwrap();
// recover physical column names
let physical_column_names = physical_columns
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MetricEngineInner {
let is_putting_physical_region = self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&region_id);

Expand All @@ -68,7 +68,7 @@ impl MetricEngineInner {
let physical_region_id = *self
.state
.read()
.await
.unwrap()
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
Expand Down
20 changes: 12 additions & 8 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ impl MetricEngineInner {
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
let is_reading_physical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
let is_reading_physical_region = self.is_physical_region(region_id);

if is_reading_physical_region {
info!(
Expand Down Expand Up @@ -88,7 +83,7 @@ impl MetricEngineInner {
let is_reading_physical_region = self
.state
.read()
.await
.unwrap()
.physical_regions()
.contains_key(&region_id);

Expand All @@ -104,8 +99,17 @@ impl MetricEngineInner {
}
}

/// Returns true if it's a physical region.
pub fn is_physical_region(&self, region_id: RegionId) -> bool {
self.state
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.read()
.unwrap()
.physical_regions()
.contains_key(&region_id)
}

async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
let state = &self.state.read().await;
let state = &self.state.read().unwrap();
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
Expand Down
Loading