Skip to content

Commit

Permalink
feat(metric-engine): open and close metric regions (#2961)
Browse files Browse the repository at this point in the history
* implementation

Signed-off-by: Ruihang Xia <[email protected]>

* tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix typo

Signed-off-by: Ruihang Xia <[email protected]>

* add metrics

Signed-off-by: Ruihang Xia <[email protected]>

* define consts

Signed-off-by: Ruihang Xia <[email protected]>

* fix compile error

Signed-off-by: Ruihang Xia <[email protected]>

* print region id with display

Signed-off-by: Ruihang Xia <[email protected]>

* only ignore region not found

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 21, 2023
1 parent 8776b12 commit 6a1f575
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 14 deletions.
88 changes: 83 additions & 5 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

mod alter;
mod close;
mod create;
mod open;
mod put;
mod read;
mod region_metadata;
Expand All @@ -22,7 +24,8 @@ mod state;
use std::sync::Arc;

use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
Expand Down Expand Up @@ -122,8 +125,8 @@ impl RegionEngine for MetricEngine {
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self.inner.create_region(region_id, create).await,
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await,
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
Expand Down Expand Up @@ -157,11 +160,18 @@ impl RegionEngine for MetricEngine {

/// Stops the engine
async fn stop(&self) -> Result<(), BoxedError> {
todo!()
// don't need to stop the underlying mito engine
Ok(())
}

fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
todo!()
// ignore the region not found error
let result = self.inner.mito.set_writable(region_id, writable);

match result {
Err(e) if e.status_code() == StatusCode::RegionNotFound => Ok(()),
_ => result,
}
}

async fn set_readonly_gracefully(
Expand Down Expand Up @@ -197,3 +207,71 @@ struct MetricEngineInner {
data_region: DataRegion,
state: RwLock<MetricEngineState>,
}

#[cfg(test)]
mod test {
use std::collections::HashMap;

use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};

use super::*;
use crate::test_util::TestEnv;

#[tokio::test]
async fn close_open_regions() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();

// close physical region
let physical_region_id = env.default_physical_region_id();
engine
.handle_request(
physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();

// reopen physical region
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap();

// close nonexistent region
let nonexistent_region_id = RegionId::new(12313, 12);
engine
.handle_request(
nonexistent_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap_err();

// open nonexistent region
let invalid_open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: HashMap::new(),
skip_wal_replay: false,
};
engine
.handle_request(
nonexistent_region_id,
RegionRequest::Open(invalid_open_request),
)
.await
.unwrap_err();
}
}
93 changes: 93 additions & 0 deletions src/metric-engine/src/engine/close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Close a metric region
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;

use super::MetricEngineInner;
use crate::error::{
CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu,
PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::{metadata_region, utils};

impl MetricEngineInner {
pub async fn close_region(
&self,
region_id: RegionId,
_req: RegionCloseRequest,
) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
if self
.state
.read()
.await
.physical_regions()
.contains_key(&data_region_id)
{
self.close_physical_region(data_region_id).await?;
self.state
.write()
.await
.remove_physical_region(data_region_id)?;

Ok(0)
} else if self
.state
.read()
.await
.logical_regions()
.contains_key(&region_id)
{
Ok(0)
} else {
Err(LogicalRegionNotFoundSnafu { region_id }.build())
}
}

async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);

self.mito
.handle_request(data_region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;

PHYSICAL_REGION_COUNT.dec();

Ok(0)
}
}

// Unit tests in engine.rs
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl MetricEngineInner {
region_type: DATA_REGION_SUBDIR,
})?;

info!("Created physical metric region {region_id:?}");
info!("Created physical metric region {region_id}");
PHYSICAL_REGION_COUNT.inc();

// remember this table
Expand Down
156 changes: 156 additions & 0 deletions src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Open a metric region.
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::ResultExt;
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

use super::MetricEngineInner;
use crate::error::{Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, Result};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;

impl MetricEngineInner {
/// Open a metric region.
///
/// Only open requests to a physical region matter. Those to logical regions are
/// actually an empty operation -- it only check if the request is valid. Since
/// logical regions are multiplexed over physical regions, they are always "open".
///
/// If trying to open a logical region whose physical region is not open, metric
/// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
/// error.
pub async fn open_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let is_opening_physical_region = request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY);

if is_opening_physical_region {
// open physical region and recover states
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id).await?;

Ok(0)
} else if self
.state
.read()
.await
.logical_regions()
.contains_key(&region_id)
{
// if the logical region is already open, do nothing
Ok(0)
} else {
// throw RegionNotFound error
Err(LogicalRegionNotFoundSnafu { region_id }.build())
}
}

/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);

let open_metadata_region_request = RegionOpenRequest {
region_dir: metadata_region_dir,
options: request.options.clone(),
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
};
let open_data_region_request = RegionOpenRequest {
region_dir: data_region_dir,
options: request.options.clone(),
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
};

let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);

self.mito
.handle_request(
metadata_region_id,
RegionRequest::Open(open_metadata_region_request),
)
.await
.with_context(|_| OpenMitoRegionSnafu {
region_type: "metadata",
})?;
self.mito
.handle_request(
data_region_id,
RegionRequest::Open(open_data_region_request),
)
.await
.with_context(|_| OpenMitoRegionSnafu {
region_type: "data",
})?;

info!("Opened physical metric region {region_id}");
PHYSICAL_REGION_COUNT.inc();

Ok(0)
}

/// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
/// physical region (idnefied by the given region id).
///
/// Includes:
/// - Record physical region's column names
/// - Record the mapping between logical region id and physical region id
async fn recover_states(&self, physical_region_id: RegionId) -> Result<()> {
// load logical regions and physical column names
let logical_regions = self
.metadata_region
.logical_regions(physical_region_id)
.await?;
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
let logical_region_num = logical_regions.len();

let mut state = self.state.write().await;
// recover physical column names
let physical_column_names = physical_columns
.into_iter()
.map(|col| col.column_schema.name)
.collect();
state.add_physical_region(physical_region_id, physical_column_names);
// recover logical regions
for logical_region_id in logical_regions {
state.add_logical_region(physical_region_id, logical_region_id);
}
LOGICAL_REGION_COUNT.add(logical_region_num as i64);

Ok(())
}
}

// Unit tests in engine.rs
Loading

0 comments on commit 6a1f575

Please sign in to comment.