diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index dce8a1151cc7..362d9205ab0a 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -134,18 +134,17 @@ mod test { #[tokio::test] async fn test_add_columns() { - common_telemetry::init_default_ut_logging(); - let env = TestEnv::new().await; env.init_metric_region().await; let current_version = env .mito() - .get_metadata(utils::to_data_region_id(env.default_region_id())) + .get_metadata(utils::to_data_region_id(env.default_physical_region_id())) .await .unwrap() .schema_version; - assert_eq!(current_version, 0); + // TestEnv will create a logical region which changes the version to 1. + assert_eq!(current_version, 1); let new_columns = vec![ ColumnMetadata { @@ -168,13 +167,13 @@ mod test { }, ]; env.data_region() - .add_columns(env.default_region_id(), new_columns) + .add_columns(env.default_physical_region_id(), new_columns) .await .unwrap(); let new_metadata = env .mito() - .get_metadata(utils::to_data_region_id(env.default_region_id())) + .get_metadata(utils::to_data_region_id(env.default_physical_region_id())) .await .unwrap(); let column_names = new_metadata @@ -182,15 +181,21 @@ mod test { .iter() .map(|c| &c.column_schema.name) .collect::>(); - let expected = vec!["greptime_timestamp", "__metric", "__tsid", "tag2", "tag3"]; + let expected = vec![ + "greptime_timestamp", + "greptime_value", + "__metric", + "__tsid", + "job", + "tag2", + "tag3", + ]; assert_eq!(column_names, expected); } // Only string is allowed for tag column #[tokio::test] async fn test_add_invalid_column() { - common_telemetry::init_default_ut_logging(); - let env = TestEnv::new().await; env.init_metric_region().await; @@ -201,7 +206,7 @@ mod test { }]; let result = env .data_region() - .add_columns(env.default_region_id(), new_columns) + .add_columns(env.default_physical_region_id(), new_columns) .await; assert!(result.is_err()); } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index fd080d6ed681..c2d84a11814a 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -34,18 +34,20 @@ use store_api::region_request::{ AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest, }; use store_api::storage::consts::ReservedColumnId; -use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId}; +use store_api::storage::{RegionGroup, RegionId, ScanRequest}; use tokio::sync::RwLock; use crate::data_region::DataRegion; use crate::error::{ - ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, - LogicalTableNotFoundSnafu, MissingRegionOptionSnafu, PhysicalRegionNotFoundSnafu, - PhysicalTableNotFoundSnafu, Result, + ConflictRegionOptionSnafu, CreateMitoRegionSnafu, ForbiddenPhysicalAlterSnafu, + InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu, MissingRegionOptionSnafu, + ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result, }; use crate::metadata_region::MetadataRegion; -use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; -use crate::utils; +use crate::metrics::{ + FORBIDDEN_OPERATION_COUNT, LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT, +}; +use crate::utils::{self, to_data_region_id}; /// region group value for data region inside a metric region pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0; @@ -95,8 +97,10 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table"; /// on_physical_table = "physical_table", /// ); /// ``` +/// And this key will be translated to corresponding physical **REGION** id in metasrv. pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; +#[derive(Clone)] pub struct MetricEngine { inner: Arc, } @@ -127,7 +131,11 @@ impl RegionEngine for MetricEngine { RegionRequest::Drop(_) => todo!(), RegionRequest::Open(_) => todo!(), RegionRequest::Close(_) => todo!(), - RegionRequest::Alter(_) => todo!(), + RegionRequest::Alter(alter) => self + .inner + .alter_region(region_id, alter) + .await + .map(|_| Output::AffectedRows(0)), RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), @@ -185,23 +193,75 @@ impl MetricEngine { mito, metadata_region, data_region, - physical_tables: RwLock::default(), - physical_columns: RwLock::default(), + state: RwLock::default(), }), } } } +/// Internal states of metric engine +#[derive(Default)] +struct MetricEngineState { + /// Mapping from physical region id to its logical region ids + /// `logical_regions` records a reverse mapping from logical region id to + /// physical region id + physical_regions: HashMap>, + /// Mapping from logical region id to physical region id. + logical_regions: HashMap, + /// Cache for the columns of physical regions. + /// The region id in key is the data region id. + physical_columns: HashMap>, +} + +impl MetricEngineState { + pub fn add_physical_region( + &mut self, + physical_region_id: RegionId, + physical_columns: HashSet, + ) { + let physical_region_id = to_data_region_id(physical_region_id); + self.physical_regions + .insert(physical_region_id, HashSet::new()); + self.physical_columns + .insert(physical_region_id, physical_columns); + } + + /// # Panic + /// if the physical region does not exist + pub fn add_physical_columns( + &mut self, + physical_region_id: RegionId, + physical_columns: impl IntoIterator, + ) { + let physical_region_id = to_data_region_id(physical_region_id); + let columns = self.physical_columns.get_mut(&physical_region_id).unwrap(); + for col in physical_columns { + columns.insert(col); + } + } + + /// # Panic + /// if the physical region does not exist + pub fn add_logical_region( + &mut self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) { + let physical_region_id = to_data_region_id(physical_region_id); + self.physical_regions + .get_mut(&physical_region_id) + .unwrap() + .insert(logical_region_id); + self.logical_regions + .insert(logical_region_id, physical_region_id); + } +} + struct MetricEngineInner { mito: MitoEngine, metadata_region: MetadataRegion, data_region: DataRegion, - // TODO(ruihang): handle different catalog/schema - /// Map from physical table name to table id. - physical_tables: RwLock>, - /// Cache for the columns of physical regions. - /// The region id in key is the data region id. - physical_columns: RwLock>>, + state: RwLock, } impl MetricEngineInner { @@ -230,14 +290,6 @@ impl MetricEngineInner { ) -> Result<()> { let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id); - // TODO: workaround for now, should find another way to retrieve the - // table name. - let physical_table_name = request - .options - .get(PHYSICAL_TABLE_METADATA_KEY) - .ok_or(MissingRegionOptionSnafu {}.build())? - .to_string(); - // create metadata region let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); @@ -253,6 +305,11 @@ impl MetricEngineInner { // create data region let create_data_region_request = self.create_request_for_data_region(&request); + let physical_column_set = create_data_region_request + .column_metadatas + .iter() + .map(|metadata| metadata.column_schema.name.clone()) + .collect::>(); self.mito .handle_request( data_region_id, @@ -263,14 +320,14 @@ impl MetricEngineInner { region_type: DATA_REGION_SUBDIR, })?; - info!("Created physical metric region {region_id:?} with table name {physical_table_name}"); + info!("Created physical metric region {region_id:?}"); PHYSICAL_REGION_COUNT.inc(); // remember this table - self.physical_tables + self.state .write() .await - .insert(physical_table_name, region_id.table_id()); + .add_physical_region(data_region_id, physical_column_set); Ok(()) } @@ -287,59 +344,71 @@ impl MetricEngineInner { /// If the logical region to create already exists, this method will do nothing. async fn create_logical_region( &self, - region_id: RegionId, + logical_region_id: RegionId, request: RegionCreateRequest, ) -> Result<()> { // transform IDs - let physical_table_name = request + let physical_region_id_raw = request .options .get(LOGICAL_TABLE_METADATA_KEY) .ok_or(MissingRegionOptionSnafu {}.build())?; - let physical_table_id = *self - .physical_tables - .read() - .await - .get(physical_table_name) - .with_context(|| PhysicalTableNotFoundSnafu { - physical_table: physical_table_name, - })?; - let logical_table_id = region_id.table_id(); - let physical_region_id = RegionId::new(physical_table_id, region_id.region_number()); + let physical_region_id: RegionId = physical_region_id_raw + .parse::() + .with_context(|_| ParseRegionIdSnafu { + raw: physical_region_id_raw, + })? + .into(); let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id); - // check if the logical table already exist + // check if the logical region already exist if self .metadata_region - .is_table_exist(metadata_region_id, logical_table_id) + .is_logical_region_exists(metadata_region_id, logical_region_id) .await? { - info!("Create a existing logical region {region_id}. Skipped"); + info!("Create a existing logical region {logical_region_id}. Skipped"); return Ok(()); } // find new columns to add - let physical_columns = self.physical_columns.read().await; - let physical_columns = - physical_columns - .get(&data_region_id) - .with_context(|| PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - })?; let mut new_columns = vec![]; - for col in &request.column_metadatas { - if !physical_columns.contains(&col.column_schema.name) { - new_columns.push(col.clone()); + { + let physical_columns = &self.state.read().await.physical_columns; + let physical_columns = physical_columns.get(&data_region_id).with_context(|| { + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + } + })?; + for col in &request.column_metadatas { + if !physical_columns.contains(&col.column_schema.name) { + new_columns.push(col.clone()); + } } } + info!("Found new columns {new_columns:?} to add to physical region {data_region_id}"); self.add_columns_to_physical_data_region( data_region_id, metadata_region_id, - logical_table_id, + logical_region_id, new_columns, ) .await?; + // register logical region to metadata region + self.metadata_region + .add_logical_region(metadata_region_id, logical_region_id) + .await?; + + // update the mapping + // Safety: previous steps ensure the physical region exist + self.state + .write() + .await + .add_logical_region(physical_region_id, logical_region_id); + info!("Created new logical region {logical_region_id} on physical region {data_region_id}"); + LOGICAL_REGION_COUNT.inc(); + Ok(()) } @@ -347,7 +416,7 @@ impl MetricEngineInner { &self, data_region_id: RegionId, metadata_region_id: RegionId, - logical_table_id: TableId, + logical_region_id: RegionId, new_columns: Vec, ) -> Result<()> { // alter data region @@ -360,29 +429,23 @@ impl MetricEngineInner { self.metadata_region .add_column( metadata_region_id, - logical_table_id, + logical_region_id, &col.column_schema.name, col.semantic_type, ) .await?; } - let mut physical_columns = self.physical_columns.write().await; // safety: previous step has checked this - let mut column_set = physical_columns.get_mut(&data_region_id).unwrap(); - for col in &new_columns { - column_set.insert(col.column_schema.name.clone()); - } - info!("Create table {logical_table_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); + self.state.write().await.add_physical_columns( + data_region_id, + new_columns + .iter() + .map(|meta| meta.column_schema.name.clone()), + ); + info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); - // register table to metadata region - self.metadata_region - .add_table(metadata_region_id, logical_table_id) - .await?; - info!("Created new logical table {logical_table_id} on physical region {data_region_id}"); - LOGICAL_REGION_COUNT.inc(); - Ok(()) } @@ -546,38 +609,51 @@ impl MetricEngineInner { } impl MetricEngineInner { - pub async fn alter_logic_region( + /// Dispatch region alter request + pub async fn alter_region( + &self, + region_id: RegionId, + request: RegionAlterRequest, + ) -> Result<()> { + let is_altering_logical_region = self + .state + .read() + .await + .physical_regions + .contains_key(®ion_id); + if is_altering_logical_region { + self.alter_physical_region(region_id, request).await + } else { + self.alter_logical_region(region_id, request).await + } + } + + async fn alter_logical_region( &self, region_id: RegionId, request: RegionAlterRequest, ) -> Result<()> { + let physical_region_id = { + let logical_regions = &self.state.read().await.logical_regions; + *logical_regions.get(®ion_id).with_context(|| { + error!("Trying to alter an nonexistent region {region_id}"); + LogicalRegionNotFoundSnafu { region_id } + })? + }; + // only handle adding column let AlterKind::AddColumns { columns } = request.kind else { return Ok(()); }; - let logical_table_id = region_id.table_id(); - - // check if the table exists - let metadata_region_id = utils::to_metadata_region_id(region_id); - if !self - .metadata_region - .is_table_exist(metadata_region_id, logical_table_id) - .await? - { - error!("Trying to alter an nonexistent table {logical_table_id}"); - return LogicalTableNotFoundSnafu { - table_id: logical_table_id, - } - .fail(); - } + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let mut columns_to_add = vec![]; for col in columns { if self .metadata_region .column_semantic_type( metadata_region_id, - logical_table_id, + region_id, &col.column_metadata.column_schema.name, ) .await? @@ -587,23 +663,36 @@ impl MetricEngineInner { } } - let data_region_id = utils::to_data_region_id(region_id); + let data_region_id = utils::to_data_region_id(physical_region_id); self.add_columns_to_physical_data_region( data_region_id, metadata_region_id, - logical_table_id, + region_id, columns_to_add, ) .await?; Ok(()) } + + async fn alter_physical_region( + &self, + region_id: RegionId, + request: RegionAlterRequest, + ) -> Result<()> { + info!("Metric region received alter request {request:?} on physical region {region_id:?}"); + FORBIDDEN_OPERATION_COUNT.inc(); + + ForbiddenPhysicalAlterSnafu.fail() + } } #[cfg(test)] mod tests { use std::hash::Hash; + use store_api::region_request::AddColumn; + use super::*; use crate::test_util::TestEnv; @@ -749,4 +838,63 @@ mod tests { vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()] ); } + + #[tokio::test] + async fn test_alter_region() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let engine = env.metric(); + let engine_inner = engine.inner; + + // alter physical region + let physical_region_id = env.default_physical_region_id(); + let request = RegionAlterRequest { + schema_version: 0, + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag1", + ConcreteDataType::string_datatype(), + false, + ), + }, + location: None, + }], + }, + }; + + let result = engine_inner + .alter_physical_region(physical_region_id, request.clone()) + .await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Alter request to physical region is forbidden".to_string() + ); + + // alter logical region + let metadata_region = env.metadata_region(); + let logical_region_id = env.default_logical_region_id(); + let is_column_exist = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, "tag1") + .await + .unwrap() + .is_some(); + assert!(!is_column_exist); + + let region_id = env.default_logical_region_id(); + engine_inner + .alter_logical_region(region_id, request) + .await + .unwrap(); + let semantic_type = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, "tag1") + .await + .unwrap() + .unwrap(); + assert_eq!(semantic_type, SemanticType::Tag); + } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 3a6c0ff29f74..3038ebfb1de5 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -19,7 +19,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -35,9 +35,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Table `{}` already exists", table_id))] - TableAlreadyExists { - table_id: TableId, + #[snafu(display("Region `{}` already exists", region_id))] + RegionAlreadyExists { + region_id: RegionId, location: Location, }, @@ -56,11 +56,11 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse table id from {}", raw))] - ParseTableId { + #[snafu(display("Failed to parse region id from {}", raw))] + ParseRegionId { raw: String, #[snafu(source)] - error: ::Err, + error: ::Err, location: Location, }, @@ -91,22 +91,15 @@ pub enum Error { #[snafu(display("Region options are conflicted"))] ConflictRegionOption { location: Location }, - // TODO: remove this - #[snafu(display("Physical table {} not found", physical_table))] - PhysicalTableNotFound { - physical_table: String, - location: Location, - }, - #[snafu(display("Physical region {} not found", region_id))] PhysicalRegionNotFound { region_id: RegionId, location: Location, }, - #[snafu(display("Logical table {} not found", table_id))] - LogicalTableNotFound { - table_id: TableId, + #[snafu(display("Logical region {} not found", region_id))] + LogicalRegionNotFound { + region_id: RegionId, location: Location, }, @@ -115,6 +108,9 @@ pub enum Error { column_type: ConcreteDataType, location: Location, }, + + #[snafu(display("Alter request to physical region is forbidden"))] + ForbiddenPhysicalAlter { location: Location }, } pub type Result = std::result::Result; @@ -129,14 +125,16 @@ impl ErrorExt for Error { | ConflictRegionOption { .. } | ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, + ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported, + MissingInternalColumn { .. } | DeserializeSemanticType { .. } | DecodeColumnValue { .. } - | ParseTableId { .. } => StatusCode::Unexpected, - - PhysicalTableNotFound { .. } | LogicalTableNotFound { .. } => StatusCode::TableNotFound, + | ParseRegionId { .. } => StatusCode::Unexpected, - PhysicalRegionNotFound { .. } => StatusCode::RegionNotFound, + PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => { + StatusCode::RegionNotFound + } CreateMitoRegion { source, .. } | MitoReadOperation { source, .. } @@ -144,7 +142,7 @@ impl ErrorExt for Error { CollectRecordBatchStream { source, .. } => source.status_code(), - TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists, } } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 2f3fa5061a0b..68f7d4248fe7 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -31,15 +31,15 @@ use crate::engine::{ }; use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, - MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseTableIdSnafu, Result, - TableAlreadyExistsSnafu, + MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu, + Result, }; use crate::utils; /// The other two fields key and value will be used as a k-v storage. -/// It contains two group of key (TABLE_ID refers to the logical table's id): -/// - `__table_` is used for marking table existence. It doesn't have value. -/// - `__column__` is used for marking column existence, +/// It contains two group of key: +/// - `__region_` is used for marking table existence. It doesn't have value. +/// - `__column__` is used for marking column existence, /// the value is column's semantic type. To avoid the key conflict, this column key /// will be encoded by base64([STANDARD_NO_PAD]). /// @@ -48,10 +48,6 @@ use crate::utils; /// every operation should be associated to a [RegionId], which is the physical /// table id + region sequence. This handler will transform the region group by /// itself. -/// -/// Notice that all the `region_id` in the public interfaces refers to the -/// physical region id of metadata region. While the `table_id` refers to -/// the logical table id. pub struct MetadataRegion { mito: MitoEngine, } @@ -65,16 +61,23 @@ impl MetadataRegion { /// /// This method will check if the table key already exists, if so, it will return /// a [TableAlreadyExistsSnafu] error. - pub async fn add_table(&self, region_id: RegionId, table_id: TableId) -> Result<()> { - let region_id = utils::to_metadata_region_id(region_id); - let table_key = Self::concat_table_key(table_id); + pub async fn add_logical_region( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result<()> { + let region_id = utils::to_metadata_region_id(physical_region_id); + let region_key = Self::concat_region_key(logical_region_id); let put_success = self - .put_conditionally(region_id, table_key, String::new()) + .put_if_absent(region_id, region_key, String::new()) .await?; if !put_success { - TableAlreadyExistsSnafu { table_id }.fail() + RegionAlreadyExistsSnafu { + region_id: logical_region_id, + } + .fail() } else { Ok(()) } @@ -86,15 +89,15 @@ impl MetadataRegion { /// will return if the column is successfully added. pub async fn add_column( &self, - region_id: RegionId, - table_id: TableId, + physical_region_id: RegionId, + logical_region_id: RegionId, column_name: &str, semantic_type: SemanticType, ) -> Result { - let region_id = utils::to_metadata_region_id(region_id); - let column_key = Self::concat_column_key(table_id, column_name); + let region_id = utils::to_metadata_region_id(physical_region_id); + let column_key = Self::concat_column_key(logical_region_id, column_name); - self.put_conditionally( + self.put_if_absent( region_id, column_key, Self::serialize_semantic_type(semantic_type), @@ -102,22 +105,26 @@ impl MetadataRegion { .await } - /// Check if the given table exists. - pub async fn is_table_exist(&self, region_id: RegionId, table_id: TableId) -> Result { - let region_id = utils::to_metadata_region_id(region_id); - let table_key = Self::concat_table_key(table_id); - self.exist(region_id, &table_key).await + /// Check if the given logical region exists. + pub async fn is_logical_region_exists( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result { + let region_id = utils::to_metadata_region_id(physical_region_id); + let region_key = Self::concat_region_key(logical_region_id); + self.exists(region_id, ®ion_key).await } /// Check if the given column exists. Return the semantic type if exists. pub async fn column_semantic_type( &self, - region_id: RegionId, - table_id: TableId, + physical_region_id: RegionId, + logical_region_id: RegionId, column_name: &str, ) -> Result> { - let region_id = utils::to_metadata_region_id(region_id); - let column_key = Self::concat_column_key(table_id, column_name); + let region_id = utils::to_metadata_region_id(physical_region_id); + let column_key = Self::concat_column_key(logical_region_id, column_name); let semantic_type = self.get(region_id, &column_key).await?; semantic_type .map(|s| Self::deserialize_semantic_type(&s)) @@ -127,36 +134,37 @@ impl MetadataRegion { // utils to concat and parse key/value impl MetadataRegion { - pub fn concat_table_key(table_id: TableId) -> String { - format!("__table_{}", table_id) + pub fn concat_region_key(region_id: RegionId) -> String { + format!("__region_{}", region_id.as_u64()) } /// Column name will be encoded by base64([STANDARD_NO_PAD]) - pub fn concat_column_key(table_id: TableId, column_name: &str) -> String { + pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String { let encoded_column_name = STANDARD_NO_PAD.encode(column_name); - format!("__column_{}_{}", table_id, encoded_column_name) + format!("__column_{}_{}", region_id.as_u64(), encoded_column_name) } - pub fn parse_table_key(key: &str) -> Option<&str> { - key.strip_prefix("__table_") + pub fn parse_region_key(key: &str) -> Option<&str> { + key.strip_prefix("__region_") } - /// Parse column key to (table_name, column_name) - pub fn parse_column_key(key: &str) -> Result> { + /// Parse column key to (logical_region_id, column_name) + pub fn parse_column_key(key: &str) -> Result> { if let Some(stripped) = key.strip_prefix("__column_") { let mut iter = stripped.split('_'); - let table_id_raw = iter.next().unwrap(); - let table_id = table_id_raw - .parse() - .with_context(|_| ParseTableIdSnafu { raw: table_id_raw })?; + let region_id_raw = iter.next().unwrap(); + let region_id = region_id_raw + .parse::() + .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })? + .into(); let encoded_column_name = iter.next().unwrap(); let column_name = STANDARD_NO_PAD .decode(encoded_column_name) .context(DecodeColumnValueSnafu)?; - Ok(Some((table_id, String::from_utf8(column_name).unwrap()))) + Ok(Some((region_id, String::from_utf8(column_name).unwrap()))) } else { Ok(None) } @@ -179,13 +187,13 @@ impl MetadataRegion { impl MetadataRegion { /// Put if not exist, return if this put operation is successful (error other /// than "key already exist" will be wrapped in [Err]). - pub async fn put_conditionally( + pub async fn put_if_absent( &self, region_id: RegionId, key: String, value: String, ) -> Result { - if self.exist(region_id, &key).await? { + if self.exists(region_id, &key).await? { return Ok(false); } @@ -203,7 +211,7 @@ impl MetadataRegion { /// Check if the given key exists. /// /// Notice that due to mito doesn't support transaction, TOCTTOU is possible. - pub async fn exist(&self, region_id: RegionId, key: &str) -> Result { + pub async fn exists(&self, region_id: RegionId, key: &str) -> Result { let scan_req = Self::build_read_request(key); let record_batch_stream = self .mito @@ -310,40 +318,40 @@ mod test { #[test] fn test_concat_table_key() { - let table_id = 12934; - let expected = "__table_12934".to_string(); - assert_eq!(MetadataRegion::concat_table_key(table_id), expected); + let region_id = RegionId::new(1234, 7844); + let expected = "__region_5299989651108".to_string(); + assert_eq!(MetadataRegion::concat_region_key(region_id), expected); } #[test] fn test_concat_column_key() { - let table_id = 91959; + let region_id = RegionId::new(8489, 9184); let column_name = "my_column"; - let expected = "__column_91959_bXlfY29sdW1u".to_string(); + let expected = "__column_36459977384928_bXlfY29sdW1u".to_string(); assert_eq!( - MetadataRegion::concat_column_key(table_id, column_name), + MetadataRegion::concat_column_key(region_id, column_name), expected ); } #[test] fn test_parse_table_key() { - let table_id = 93585; - let encoded = MetadataRegion::concat_column_key(table_id, "my_column"); - assert_eq!(encoded, "__column_93585_bXlfY29sdW1u"); + let region_id = RegionId::new(87474, 10607); + let encoded = MetadataRegion::concat_column_key(region_id, "my_column"); + assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u"); let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); - assert_eq!(decoded, Some((table_id, "my_column".to_string()))); + assert_eq!(decoded, Some((region_id, "my_column".to_string()))); } #[test] fn test_parse_valid_column_key() { - let table_id = 73952; - let encoded = MetadataRegion::concat_column_key(table_id, "my_column"); - assert_eq!(encoded, "__column_73952_bXlfY29sdW1u"); + let region_id = RegionId::new(176, 910); + let encoded = MetadataRegion::concat_column_key(region_id, "my_column"); + assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u"); let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); - assert_eq!(decoded, Some((table_id, "my_column".to_string()))); + assert_eq!(decoded, Some((region_id, "my_column".to_string()))); } #[test] @@ -396,13 +404,13 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_region_id()); + let region_id = to_metadata_region_id(env.default_physical_region_id()); // Test inserting a new key-value pair let key = "test_key".to_string(); let value = "test_value".to_string(); let result = metadata_region - .put_conditionally(region_id, key.clone(), value.clone()) + .put_if_absent(region_id, key.clone(), value.clone()) .await; assert!(result.is_ok()); assert!(result.unwrap()); @@ -419,7 +427,7 @@ mod test { // Test inserting the same key-value pair again let result = metadata_region - .put_conditionally(region_id, key.clone(), value.clone()) + .put_if_absent(region_id, key.clone(), value.clone()) .await; assert!(result.is_ok()); assert!(!result.unwrap(),); @@ -430,11 +438,11 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_region_id()); + let region_id = to_metadata_region_id(env.default_physical_region_id()); // Test checking for a non-existent key let key = "test_key".to_string(); - let result = metadata_region.exist(region_id, &key).await; + let result = metadata_region.exists(region_id, &key).await; assert!(result.is_ok()); assert!(!result.unwrap()); @@ -446,7 +454,7 @@ mod test { .handle_request(region_id, RegionRequest::Put(put_request)) .await .unwrap(); - let result = metadata_region.exist(region_id, &key).await; + let result = metadata_region.exists(region_id, &key).await; assert!(result.is_ok()); assert!(result.unwrap(),); } @@ -456,7 +464,7 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_region_id()); + let region_id = to_metadata_region_id(env.default_physical_region_id()); // Test getting a non-existent key let key = "test_key".to_string(); @@ -478,26 +486,26 @@ mod test { } #[tokio::test] - async fn test_add_table() { + async fn test_add_logical_region() { let env = TestEnv::new().await; env.init_metric_region().await; let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_region_id()); + let physical_region_id = to_metadata_region_id(env.default_physical_region_id()); // add one table - let table_id = 77889; + let logical_region_id = RegionId::new(196, 2333); metadata_region - .add_table(region_id, table_id) + .add_logical_region(physical_region_id, logical_region_id) .await .unwrap(); assert!(metadata_region - .is_table_exist(region_id, table_id) + .is_logical_region_exists(physical_region_id, logical_region_id) .await .unwrap()); // add it again assert!(metadata_region - .add_table(region_id, table_id) + .add_logical_region(physical_region_id, logical_region_id) .await .is_err()); } @@ -507,29 +515,39 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_region_id()); + let physical_region_id = to_metadata_region_id(env.default_physical_region_id()); - let table_id = 23638; + let logical_region_id = RegionId::new(868, 8390); let column_name = "column1"; let semantic_type = SemanticType::Tag; metadata_region - .add_column(region_id, table_id, column_name, semantic_type) + .add_column( + physical_region_id, + logical_region_id, + column_name, + semantic_type, + ) .await .unwrap(); let actual_semantic_type = metadata_region - .column_semantic_type(region_id, table_id, column_name) + .column_semantic_type(physical_region_id, logical_region_id, column_name) .await .unwrap(); assert_eq!(actual_semantic_type, Some(semantic_type)); // duplicate column won't be updated let is_updated = metadata_region - .add_column(region_id, table_id, column_name, SemanticType::Field) + .add_column( + physical_region_id, + logical_region_id, + column_name, + SemanticType::Field, + ) .await .unwrap(); assert!(!is_updated); let actual_semantic_type = metadata_region - .column_semantic_type(region_id, table_id, column_name) + .column_semantic_type(physical_region_id, logical_region_id, column_name) .await .unwrap(); assert_eq!(actual_semantic_type, Some(semantic_type)); diff --git a/src/metric-engine/src/metrics.rs b/src/metric-engine/src/metrics.rs index df65604c765b..d026e57d0ac7 100644 --- a/src/metric-engine/src/metrics.rs +++ b/src/metric-engine/src/metrics.rs @@ -20,17 +20,21 @@ use prometheus::*; lazy_static! { /// Gauge for opened regions pub static ref PHYSICAL_REGION_COUNT: IntGauge = - register_int_gauge!("metric_physical_region_count", "metric engine physical region count").unwrap(); + register_int_gauge!("metric_engine_physical_region_count", "metric engine physical region count").unwrap(); /// Gauge of columns across all opened regions pub static ref PHYSICAL_COLUMN_COUNT: IntGauge = - register_int_gauge!("metric_physical_column_count", "metric engine physical column count").unwrap(); + register_int_gauge!("metric_engine_physical_column_count", "metric engine physical column count").unwrap(); /// Gauge for opened logical regions pub static ref LOGICAL_REGION_COUNT: IntGauge = - register_int_gauge!("metric_logical_region_count", "metric engine logical region count").unwrap(); + register_int_gauge!("metric_engine_logical_region_count", "metric engine logical region count").unwrap(); - /// Gauge for opened logical regions + /// Histogram for opened logical regions pub static ref MITO_DDL_DURATION: Histogram = register_histogram!("metric_engine_mito_ddl", "metric engine mito ddl").unwrap(); + + /// Counter for forbidden operations + pub static ref FORBIDDEN_OPERATION_COUNT: IntCounter = + register_int_counter!("metric_engine_forbidden_request", "metric forbidden request").unwrap(); } diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index daec23a259f7..40e68f532299 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -27,18 +27,22 @@ use store_api::region_request::{RegionCreateRequest, RegionRequest}; use store_api::storage::RegionId; use crate::data_region::DataRegion; -use crate::engine::{MetricEngine, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; +use crate::engine::{ + MetricEngine, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, +}; use crate::metadata_region::MetadataRegion; /// Env to test metric engine. pub struct TestEnv { mito_env: MitoTestEnv, mito: MitoEngine, + metric: MetricEngine, } impl TestEnv { /// Returns a new env with empty prefix for test. pub async fn new() -> Self { + common_telemetry::init_default_ut_logging(); Self::with_prefix("").await } @@ -46,7 +50,12 @@ impl TestEnv { pub async fn with_prefix(prefix: &str) -> Self { let mut mito_env = MitoTestEnv::with_prefix(prefix); let mito = mito_env.create_engine(MitoConfig::default()).await; - Self { mito_env, mito } + let metric = MetricEngine::new(mito.clone()); + Self { + mito_env, + mito, + metric, + } } pub fn data_home(&self) -> String { @@ -60,24 +69,38 @@ impl TestEnv { } pub fn metric(&self) -> MetricEngine { - MetricEngine::new(self.mito()) + self.metric.clone() } - /// Create regions in [MetricEngine] under [`default_region_id`](TestEnv::default_region_id) + /// Create regions in [MetricEngine] under [`default_region_id`] /// and region dir `"test_metric_region"`. + /// + /// This method will create one logical region with three columns `(ts, val, job)` + /// under [`default_logical_region_id`]. pub async fn init_metric_region(&self) { - let region_id = self.default_region_id(); + let region_id = self.default_physical_region_id(); let region_create_request = RegionCreateRequest { engine: METRIC_ENGINE_NAME.to_string(), - column_metadatas: vec![ColumnMetadata { - column_id: 0, - semantic_type: SemanticType::Timestamp, - column_schema: ColumnSchema::new( - "greptime_timestamp", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - }], + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value", + ConcreteDataType::float64_datatype(), + false, + ), + }, + ], primary_key: vec![], options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] .into_iter() @@ -85,7 +108,54 @@ impl TestEnv { region_dir: "test_metric_region".to_string(), }; - // create regions + // create physical region + self.metric() + .handle_request(region_id, RegionRequest::Create(region_create_request)) + .await + .unwrap(); + + // create logical region + let region_id = self.default_logical_region_id(); + let region_create_request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value", + ConcreteDataType::float64_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 2, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "job", + ConcreteDataType::string_datatype(), + false, + ), + }, + ], + primary_key: vec![2], + options: [( + LOGICAL_TABLE_METADATA_KEY.to_string(), + self.default_physical_region_id().as_u64().to_string(), + )] + .into_iter() + .collect(), + region_dir: "test_metric_region_logical".to_string(), + }; self.metric() .handle_request(region_id, RegionRequest::Create(region_create_request)) .await @@ -100,10 +170,15 @@ impl TestEnv { DataRegion::new(self.mito()) } - /// `RegionId::new(1, 2)` - pub fn default_region_id(&self) -> RegionId { + /// Default physical region id `RegionId::new(1, 2)` + pub fn default_physical_region_id(&self) -> RegionId { RegionId::new(1, 2) } + + /// Default logical region id `RegionId::new(3, 2)` + pub fn default_logical_region_id(&self) -> RegionId { + RegionId::new(3, 2) + } } #[cfg(test)] @@ -115,11 +190,9 @@ mod test { #[tokio::test] async fn create_metadata_region() { - common_telemetry::init_default_ut_logging(); - let env = TestEnv::new().await; env.init_metric_region().await; - let region_id = to_metadata_region_id(env.default_region_id()); + let region_id = to_metadata_region_id(env.default_physical_region_id()); let region_dir = join_dir(&env.data_home(), "test_metric_region"); // `join_dir` doesn't suit windows path diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index de07dc11ea0e..e7633a2964a0 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -202,7 +202,7 @@ pub struct RegionOpenRequest { pub struct RegionCloseRequest {} /// Alter metadata of a region. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct RegionAlterRequest { /// The version of the schema before applying the alteration. pub schema_version: u64, @@ -255,7 +255,7 @@ impl TryFrom for RegionAlterRequest { } /// Kind of the alteration. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum AlterKind { /// Add columns to the region. AddColumns { @@ -342,7 +342,7 @@ impl TryFrom for AlterKind { } /// Adds a column. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct AddColumn { /// Metadata of the column to add. pub column_metadata: ColumnMetadata, @@ -408,7 +408,7 @@ impl TryFrom for AddColumn { } /// Location to add a column. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum AddColumnLocation { /// Add the column to the first position of columns. First,