From 6a1f5751c657f3bb20e138f6e0f0e8541766137f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 21 Dec 2023 15:33:22 +0800 Subject: [PATCH] feat(metric-engine): open and close metric regions (#2961) * implementation Signed-off-by: Ruihang Xia * tests Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia * add metrics Signed-off-by: Ruihang Xia * define consts Signed-off-by: Ruihang Xia * fix compile error Signed-off-by: Ruihang Xia * print region id with display Signed-off-by: Ruihang Xia * only ignore region not found Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/metric-engine/src/engine.rs | 88 ++++++++++++- src/metric-engine/src/engine/close.rs | 93 ++++++++++++++ src/metric-engine/src/engine/create.rs | 2 +- src/metric-engine/src/engine/open.rs | 156 +++++++++++++++++++++++ src/metric-engine/src/engine/state.rs | 23 ++++ src/metric-engine/src/error.rs | 16 +++ src/metric-engine/src/metadata_region.rs | 34 ++++- src/metric-engine/src/test_util.rs | 7 +- src/store-api/src/region_request.rs | 4 +- 9 files changed, 409 insertions(+), 14 deletions(-) create mode 100644 src/metric-engine/src/engine/close.rs create mode 100644 src/metric-engine/src/engine/open.rs diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 35dd0ee65571..72f169648f07 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -13,7 +13,9 @@ // limitations under the License. mod alter; +mod close; mod create; +mod open; mod put; mod read; mod region_metadata; @@ -22,7 +24,8 @@ mod state; use std::sync::Arc; use async_trait::async_trait; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; @@ -122,8 +125,8 @@ impl RegionEngine for MetricEngine { RegionRequest::Delete(_) => todo!(), RegionRequest::Create(create) => self.inner.create_region(region_id, create).await, RegionRequest::Drop(_) => todo!(), - RegionRequest::Open(_) => todo!(), - RegionRequest::Close(_) => todo!(), + RegionRequest::Open(open) => self.inner.open_region(region_id, open).await, + RegionRequest::Close(close) => self.inner.close_region(region_id, close).await, RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await, RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), @@ -157,11 +160,18 @@ impl RegionEngine for MetricEngine { /// Stops the engine async fn stop(&self) -> Result<(), BoxedError> { - todo!() + // don't need to stop the underlying mito engine + Ok(()) } fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { - todo!() + // ignore the region not found error + let result = self.inner.mito.set_writable(region_id, writable); + + match result { + Err(e) if e.status_code() == StatusCode::RegionNotFound => Ok(()), + _ => result, + } } async fn set_readonly_gracefully( @@ -197,3 +207,71 @@ struct MetricEngineInner { data_region: DataRegion, state: RwLock, } + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; + use store_api::region_request::{RegionCloseRequest, RegionOpenRequest}; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn close_open_regions() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let engine = env.metric(); + + // close physical region + let physical_region_id = env.default_physical_region_id(); + engine + .handle_request( + physical_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap(); + + // reopen physical region + let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(); + let open_request = RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + region_dir: env.default_region_dir(), + options: physical_region_option, + skip_wal_replay: false, + }; + engine + .handle_request(physical_region_id, RegionRequest::Open(open_request)) + .await + .unwrap(); + + // close nonexistent region + let nonexistent_region_id = RegionId::new(12313, 12); + engine + .handle_request( + nonexistent_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap_err(); + + // open nonexistent region + let invalid_open_request = RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + region_dir: env.default_region_dir(), + options: HashMap::new(), + skip_wal_replay: false, + }; + engine + .handle_request( + nonexistent_region_id, + RegionRequest::Open(invalid_open_request), + ) + .await + .unwrap_err(); + } +} diff --git a/src/metric-engine/src/engine/close.rs b/src/metric-engine/src/engine/close.rs new file mode 100644 index 000000000000..6510c04a7ddc --- /dev/null +++ b/src/metric-engine/src/engine/close.rs @@ -0,0 +1,93 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Close a metric region + +use mito2::engine::MITO_ENGINE_NAME; +use object_store::util::join_dir; +use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, +}; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, +}; +use store_api::storage::RegionId; + +use super::MetricEngineInner; +use crate::error::{ + CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, + PhysicalRegionNotFoundSnafu, Result, +}; +use crate::metrics::PHYSICAL_REGION_COUNT; +use crate::{metadata_region, utils}; + +impl MetricEngineInner { + pub async fn close_region( + &self, + region_id: RegionId, + _req: RegionCloseRequest, + ) -> Result { + let data_region_id = utils::to_data_region_id(region_id); + if self + .state + .read() + .await + .physical_regions() + .contains_key(&data_region_id) + { + self.close_physical_region(data_region_id).await?; + self.state + .write() + .await + .remove_physical_region(data_region_id)?; + + Ok(0) + } else if self + .state + .read() + .await + .logical_regions() + .contains_key(®ion_id) + { + Ok(0) + } else { + Err(LogicalRegionNotFoundSnafu { region_id }.build()) + } + } + + async fn close_physical_region(&self, region_id: RegionId) -> Result { + let data_region_id = utils::to_data_region_id(region_id); + let metadata_region_id = utils::to_metadata_region_id(region_id); + + self.mito + .handle_request(data_region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .with_context(|_| CloseMitoRegionSnafu { region_id })?; + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .with_context(|_| CloseMitoRegionSnafu { region_id })?; + + PHYSICAL_REGION_COUNT.dec(); + + Ok(0) + } +} + +// Unit tests in engine.rs diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 7313106d17a8..e9c5bdd11127 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -102,7 +102,7 @@ impl MetricEngineInner { region_type: DATA_REGION_SUBDIR, })?; - info!("Created physical metric region {region_id:?}"); + info!("Created physical metric region {region_id}"); PHYSICAL_REGION_COUNT.inc(); // remember this table diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs new file mode 100644 index 000000000000..1080e9302949 --- /dev/null +++ b/src/metric-engine/src/engine/open.rs @@ -0,0 +1,156 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Open a metric region. + +use common_telemetry::info; +use mito2::engine::MITO_ENGINE_NAME; +use object_store::util::join_dir; +use snafu::ResultExt; +use store_api::metric_engine_consts::{ + DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, +}; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; +use store_api::storage::RegionId; + +use super::MetricEngineInner; +use crate::error::{Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, Result}; +use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; +use crate::utils; + +impl MetricEngineInner { + /// Open a metric region. + /// + /// Only open requests to a physical region matter. Those to logical regions are + /// actually an empty operation -- it only check if the request is valid. Since + /// logical regions are multiplexed over physical regions, they are always "open". + /// + /// If trying to open a logical region whose physical region is not open, metric + /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound) + /// error. + pub async fn open_region( + &self, + region_id: RegionId, + request: RegionOpenRequest, + ) -> Result { + let is_opening_physical_region = request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY); + + if is_opening_physical_region { + // open physical region and recover states + self.open_physical_region(region_id, request).await?; + self.recover_states(region_id).await?; + + Ok(0) + } else if self + .state + .read() + .await + .logical_regions() + .contains_key(®ion_id) + { + // if the logical region is already open, do nothing + Ok(0) + } else { + // throw RegionNotFound error + Err(LogicalRegionNotFoundSnafu { region_id }.build()) + } + } + + /// Invokes mito engine to open physical regions (data and metadata). + async fn open_physical_region( + &self, + region_id: RegionId, + request: RegionOpenRequest, + ) -> Result { + let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR); + let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR); + + let open_metadata_region_request = RegionOpenRequest { + region_dir: metadata_region_dir, + options: request.options.clone(), + engine: MITO_ENGINE_NAME.to_string(), + skip_wal_replay: request.skip_wal_replay, + }; + let open_data_region_request = RegionOpenRequest { + region_dir: data_region_dir, + options: request.options.clone(), + engine: MITO_ENGINE_NAME.to_string(), + skip_wal_replay: request.skip_wal_replay, + }; + + let metadata_region_id = utils::to_metadata_region_id(region_id); + let data_region_id = utils::to_data_region_id(region_id); + + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Open(open_metadata_region_request), + ) + .await + .with_context(|_| OpenMitoRegionSnafu { + region_type: "metadata", + })?; + self.mito + .handle_request( + data_region_id, + RegionRequest::Open(open_data_region_request), + ) + .await + .with_context(|_| OpenMitoRegionSnafu { + region_type: "data", + })?; + + info!("Opened physical metric region {region_id}"); + PHYSICAL_REGION_COUNT.inc(); + + Ok(0) + } + + /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from + /// physical region (idnefied by the given region id). + /// + /// Includes: + /// - Record physical region's column names + /// - Record the mapping between logical region id and physical region id + async fn recover_states(&self, physical_region_id: RegionId) -> Result<()> { + // load logical regions and physical column names + let logical_regions = self + .metadata_region + .logical_regions(physical_region_id) + .await?; + let physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + let logical_region_num = logical_regions.len(); + + let mut state = self.state.write().await; + // recover physical column names + let physical_column_names = physical_columns + .into_iter() + .map(|col| col.column_schema.name) + .collect(); + state.add_physical_region(physical_region_id, physical_column_names); + // recover logical regions + for logical_region_id in logical_regions { + state.add_logical_region(physical_region_id, logical_region_id); + } + LOGICAL_REGION_COUNT.add(logical_region_num as i64); + + Ok(()) + } +} + +// Unit tests in engine.rs diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 0156cc0e57f1..87023bcfa5bf 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -16,8 +16,11 @@ use std::collections::{HashMap, HashSet}; +use snafu::OptionExt; use store_api::storage::RegionId; +use crate::error::{PhysicalRegionNotFoundSnafu, Result}; +use crate::metrics::LOGICAL_REGION_COUNT; use crate::utils::to_data_region_id; /// Internal states of metric engine @@ -92,4 +95,24 @@ impl MetricEngineState { pub fn logical_regions(&self) -> &HashMap { &self.logical_regions } + + /// Remove all data that are related to the physical region id. + pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> { + let physical_region_id = to_data_region_id(physical_region_id); + + let logical_regions = self.physical_regions.get(&physical_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: physical_region_id, + }, + )?; + + LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64); + + for logical_region in logical_regions { + self.logical_regions.remove(logical_region); + } + self.physical_regions.remove(&physical_region_id); + self.physical_columns.remove(&physical_region_id); + Ok(()) + } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index a1e7a3c6f9d1..9022dea7e26e 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -35,6 +35,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to open mito region, region type: {}", region_type))] + OpenMitoRegion { + region_type: String, + source: BoxedError, + location: Location, + }, + + #[snafu(display("Failed to close mito region, region id: {}", region_id))] + CloseMitoRegion { + region_id: RegionId, + source: BoxedError, + location: Location, + }, + #[snafu(display("Region `{}` already exists", region_id))] RegionAlreadyExists { region_id: RegionId, @@ -146,6 +160,8 @@ impl ErrorExt for Error { ColumnNotFound { .. } => StatusCode::TableColumnNotFound, CreateMitoRegion { source, .. } + | OpenMitoRegion { source, .. } + | CloseMitoRegion { source, .. } | MitoReadOperation { source, .. } | MitoWriteOperation { source, .. } => source.status_code(), diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index c6e48b718f87..2add36a5f56e 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -38,6 +38,9 @@ use crate::error::{ }; use crate::utils; +const REGION_PREFIX: &str = "__region_"; +const COLUMN_PREFIX: &str = "__column_"; + /// The other two fields key and value will be used as a k-v storage. /// It contains two group of key: /// - `__region_` is used for marking table existence. It doesn't have value. @@ -157,34 +160,55 @@ impl MetadataRegion { Ok(columns) } + + pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result> { + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + + let mut regions = vec![]; + for (k, _) in self.get_all(metadata_region_id).await? { + if !k.starts_with(REGION_PREFIX) { + continue; + } + // Safety: we have checked the prefix + let region_id = Self::parse_region_key(&k).unwrap(); + let region_id = region_id.parse::().unwrap().into(); + regions.push(region_id); + } + + Ok(regions) + } } // utils to concat and parse key/value impl MetadataRegion { pub fn concat_region_key(region_id: RegionId) -> String { - format!("__region_{}", region_id.as_u64()) + format!("{REGION_PREFIX}{}", region_id.as_u64()) } /// Column name will be encoded by base64([STANDARD_NO_PAD]) pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String { let encoded_column_name = STANDARD_NO_PAD.encode(column_name); - format!("__column_{}_{}", region_id.as_u64(), encoded_column_name) + format!( + "{COLUMN_PREFIX}{}_{}", + region_id.as_u64(), + encoded_column_name + ) } /// Concat a column key prefix without column name pub fn concat_column_key_prefix(region_id: RegionId) -> String { - format!("__column_{}_", region_id.as_u64()) + format!("{COLUMN_PREFIX}{}_", region_id.as_u64()) } #[allow(dead_code)] pub fn parse_region_key(key: &str) -> Option<&str> { - key.strip_prefix("__region_") + key.strip_prefix(REGION_PREFIX) } /// Parse column key to (logical_region_id, column_name) #[allow(dead_code)] pub fn parse_column_key(key: &str) -> Result> { - if let Some(stripped) = key.strip_prefix("__column_") { + if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) { let mut iter = stripped.split('_'); let region_id_raw = iter.next().unwrap(); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 4d325753f795..0de0f4beeac0 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -109,7 +109,7 @@ impl TestEnv { options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] .into_iter() .collect(), - region_dir: "test_metric_region".to_string(), + region_dir: self.default_region_dir(), }; // create physical region @@ -148,6 +148,11 @@ impl TestEnv { pub fn default_logical_region_id(&self) -> RegionId { RegionId::new(3, 2) } + + /// Default region dir `test_metric_region` + pub fn default_region_dir(&self) -> String { + "test_metric_region".to_string() + } } /// Generate a [RegionAlterRequest] for adding tag columns. diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index daa2ae3753a5..cffd6cf61d18 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -186,11 +186,11 @@ pub struct RegionCreateRequest { pub region_dir: String, } -#[derive(Debug)] +#[derive(Debug, Clone, Default)] pub struct RegionDropRequest {} /// Open region request. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RegionOpenRequest { /// Region engine name pub engine: String,