From 37c8d25d1ac726b6d5483f39c3b202346d949263 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 21 Dec 2023 22:35:23 +0800 Subject: [PATCH 1/5] query one logical table Signed-off-by: Ruihang Xia --- src/metric-engine/src/engine.rs | 5 +- src/metric-engine/src/engine/read.rs | 76 +++++++++++++++++++++----- src/metric-engine/src/error.rs | 9 +++- src/store-api/src/metadata.rs | 80 ++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 14 deletions(-) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 6da215361f83..20b7fd9bda07 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -152,7 +152,10 @@ impl RegionEngine for MetricEngine { /// Retrieves region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result { - todo!() + self.inner + .load_region_metadata(region_id) + .await + .map_err(BoxedError::new) } /// Retrieves region's disk usage. diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 6377e1cf1f1b..fb33ef69a5e4 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -12,18 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr; use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::region_engine::RegionEngine; use store_api::storage::consts::ReservedColumnId; use store_api::storage::{RegionId, ScanRequest}; use crate::engine::MetricEngineInner; -use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result}; +use crate::error::{ + InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result, +}; use crate::utils; impl MetricEngineInner { @@ -67,17 +72,7 @@ impl MetricEngineInner { logical_region_id: RegionId, request: ScanRequest, ) -> Result { - let physical_region_id = { - let state = &self.state.read().await; - state - .get_physical_region_id(logical_region_id) - .with_context(|| { - error!("Trying to read an nonexistent region {logical_region_id}"); - LogicalRegionNotFoundSnafu { - region_id: logical_region_id, - } - })? - }; + let physical_region_id = self.get_physical_region_id(logical_region_id).await?; let data_region_id = utils::to_data_region_id(physical_region_id); let request = self .transform_request(physical_region_id, logical_region_id, request) @@ -88,6 +83,38 @@ impl MetricEngineInner { .context(MitoReadOperationSnafu) } + pub async fn load_region_metadata(&self, region_id: RegionId) -> Result { + let is_reading_physical_region = self + .state + .read() + .await + .physical_regions() + .contains_key(®ion_id); + + if is_reading_physical_region { + self.mito + .get_metadata(region_id) + .await + .context(MitoReadOperationSnafu) + } else { + let physical_region_id = self.get_physical_region_id(region_id).await?; + self.logical_region_metadata(physical_region_id, region_id) + .await + } + } + + async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result { + let state = &self.state.read().await; + state + .get_physical_region_id(logical_region_id) + .with_context(|| { + error!("Trying to read an nonexistent region {logical_region_id}"); + LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + } + }) + } + /// Transform the [ScanRequest] from logical region to physical data region. async fn transform_request( &self, @@ -172,8 +199,33 @@ impl MetricEngineInner { // Safety: logical columns is a strict subset of physical columns projection.push(physical_metadata.column_index_by_id(column_id).unwrap()); } + Ok(projection) } + + pub async fn logical_region_metadata( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result { + let logical_columns = self + .load_logical_columns(physical_region_id, logical_region_id) + .await? + .into_iter() + .map(|col| col.column_id) + .collect::>(); + let physical_metadata = self + .mito + .get_metadata(physical_region_id) + .await + .context(MitoReadOperationSnafu)?; + + Ok(Arc::new( + physical_metadata + .project(&logical_columns) + .context(InvalidMetadataSnafu)?, + )) + } } #[cfg(test)] diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 9022dea7e26e..9abeef8566a1 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -132,6 +132,12 @@ pub enum Error { #[snafu(display("Alter request to physical region is forbidden"))] ForbiddenPhysicalAlter { location: Location }, + + #[snafu(display("Invalid region metadata"))] + InvalidMetadata { + source: store_api::metadata::MetadataError, + location: Location, + }, } pub type Result = std::result::Result; @@ -151,7 +157,8 @@ impl ErrorExt for Error { MissingInternalColumn { .. } | DeserializeSemanticType { .. } | DecodeColumnValue { .. } - | ParseRegionId { .. } => StatusCode::Unexpected, + | ParseRegionId { .. } + | InvalidMetadata { .. } => StatusCode::Unexpected, PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => { StatusCode::RegionNotFound diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 1ab1feaf2def..947be5ea588d 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -261,6 +261,75 @@ impl RegionMetadata { self.primary_key.iter().position(|id| *id == column_id) } + /// Project the metadata to a new one using specified column ids. + /// + /// [RegionId] and schema version are preserved. + pub fn project(&self, projection: &[ColumnId]) -> Result { + // check time index + ensure!( + projection.iter().any(|id| *id == self.time_index), + TimeIndexNotFoundSnafu + ); + + // prepare new indices + let mut indices_to_preserve = projection + .iter() + .map(|id| { + self.column_index_by_id(*id) + .with_context(|| InvalidRegionRequestSnafu { + region_id: self.region_id, + err: format!("column id {} not found", id), + }) + }) + .collect::>>()?; + indices_to_preserve.sort_unstable(); + indices_to_preserve.reverse(); + + // project schema + let projected_schema = + self.schema + .try_project(&indices_to_preserve) + .with_context(|_| SchemaProjectSnafu { + origin_schema: self.schema.clone(), + projection: projection.to_vec(), + })?; + + // project columns + let mut projected_column_metadatas = self.column_metadatas.clone(); + for index in indices_to_preserve { + projected_column_metadatas.remove(index); + } + + // generate projected primary key + let projected_primary_key = projected_column_metadatas + .iter() + .filter_map(|col| { + if col.semantic_type == SemanticType::Tag { + Some(col.column_id) + } else { + None + } + }) + .collect(); + + // generate new id_to_index + let projected_id_to_index = projected_column_metadatas + .iter() + .enumerate() + .map(|(idx, col)| (col.column_id, idx)) + .collect(); + + Ok(RegionMetadata { + schema: Arc::new(projected_schema), + time_index: self.time_index, + id_to_index: projected_id_to_index, + column_metadatas: projected_column_metadatas, + primary_key: projected_primary_key, + region_id: self.region_id, + schema_version: self.schema_version, + }) + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name. @@ -621,6 +690,17 @@ pub enum MetadataError { err: String, location: Location, }, + + #[snafu(display("Unexpected schema error during project"))] + SchemaProject { + origin_schema: SchemaRef, + projection: Vec, + location: Location, + source: datatypes::Error, + }, + + #[snafu(display("Time index column not found"))] + TimeIndexNotFound { location: Location }, } impl ErrorExt for MetadataError { From b8df79e142e4afaf15a8dcee15547fd9e5e42c0d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 24 Dec 2023 23:38:16 +0800 Subject: [PATCH 2/5] map column id Signed-off-by: Ruihang Xia --- src/metric-engine/src/engine.rs | 4 +- src/metric-engine/src/engine/alter.rs | 2 +- src/metric-engine/src/engine/create.rs | 4 +- src/metric-engine/src/engine/open.rs | 16 ++-- src/metric-engine/src/engine/put.rs | 2 +- src/metric-engine/src/engine/read.rs | 77 +++++++++++++------ .../src/engine/region_metadata.rs | 46 +++++++---- src/metric-engine/src/error.rs | 9 +++ src/metric-engine/src/metadata_region.rs | 66 +++++++++------- src/metric-engine/src/test_util.rs | 18 +++-- src/store-api/src/metadata.rs | 43 +++++------ 11 files changed, 167 insertions(+), 120 deletions(-) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 20b7fd9bda07..70038f528b60 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -264,7 +264,7 @@ mod test { .await .unwrap_err(); - // open nonexistent region + // open nonexistent region won't report error let invalid_open_request = RegionOpenRequest { engine: METRIC_ENGINE_NAME.to_string(), region_dir: env.default_region_dir(), @@ -277,6 +277,6 @@ mod test { RegionRequest::Open(invalid_open_request), ) .await - .unwrap_err(); + .unwrap(); } } diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 94b89d6a7685..e2f2153d77ac 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -97,7 +97,7 @@ impl MetricEngineInner { metadata_region_id, region_id, &col.column_metadata.column_schema.name, - col.column_metadata.semantic_type, + &col.column_metadata, ) .await?; } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index e9c5bdd11127..4281161081b9 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -189,7 +189,7 @@ impl MetricEngineInner { metadata_region_id, logical_region_id, &col.column_schema.name, - col.semantic_type, + col, ) .await?; } @@ -225,7 +225,7 @@ impl MetricEngineInner { metadata_region_id, logical_region_id, &col.column_schema.name, - col.semantic_type, + col, ) .await?; } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 1080e9302949..46c70f74d125 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -52,19 +52,13 @@ impl MetricEngineInner { self.open_physical_region(region_id, request).await?; self.recover_states(region_id).await?; - Ok(0) - } else if self - .state - .read() - .await - .logical_regions() - .contains_key(®ion_id) - { - // if the logical region is already open, do nothing Ok(0) } else { - // throw RegionNotFound error - Err(LogicalRegionNotFoundSnafu { region_id }.build()) + // Don't check if the logical region exist. Because a logical region cannot be opened + // individually, it is always "open" if its physical region is open. But the engine + // can't tell if the logical region is not exist or the physical region is not opened + // yet. Thus simply return `Ok` here to ignore all those errors. + Ok(0) } } diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 1354b8635bde..a0f187faaa7a 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -288,7 +288,7 @@ mod tests { // add columns let logical_region_id = env.default_logical_region_id(); let columns = &["odd", "even", "Ev_En"]; - let alter_request = test_util::alter_logical_region_add_tag_columns(columns); + let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns); engine .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) .await diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index fb33ef69a5e4..a0d89e4c102b 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -14,12 +14,13 @@ use std::sync::Arc; +use api::v1::SemanticType; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadataRef; +use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::region_engine::RegionEngine; use store_api::storage::consts::ReservedColumnId; @@ -157,9 +158,13 @@ impl MetricEngineInner { origin_projection: &[usize], ) -> Result> { // project on logical columns - let logical_columns = self + let all_logical_columns = self .load_logical_columns(physical_region_id, logical_region_id) .await?; + let projected_logical_names = origin_projection + .iter() + .map(|i| all_logical_columns[*i].column_schema.name.clone()) + .collect::>(); // generate physical projection let mut physical_projection = Vec::with_capacity(origin_projection.len()); @@ -169,10 +174,9 @@ impl MetricEngineInner { .get_metadata(data_region_id) .await .context(MitoReadOperationSnafu)?; - for logical_proj in origin_projection { - let column_id = logical_columns[*logical_proj].column_id; + for name in projected_logical_names { // Safety: logical columns is a strict subset of physical columns - physical_projection.push(physical_metadata.column_index_by_id(column_id).unwrap()); + physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap()); } Ok(physical_projection) @@ -186,7 +190,9 @@ impl MetricEngineInner { ) -> Result> { let logical_columns = self .load_logical_columns(physical_region_id, logical_region_id) - .await?; + .await? + .into_iter() + .map(|col| col.column_schema.name); let mut projection = Vec::with_capacity(logical_columns.len()); let data_region_id = utils::to_data_region_id(physical_region_id); let physical_metadata = self @@ -194,10 +200,9 @@ impl MetricEngineInner { .get_metadata(data_region_id) .await .context(MitoReadOperationSnafu)?; - for logical_col in logical_columns { - let column_id = logical_col.column_id; + for name in logical_columns { // Safety: logical columns is a strict subset of physical columns - projection.push(physical_metadata.column_index_by_id(column_id).unwrap()); + projection.push(physical_metadata.column_index_by_name(&name).unwrap()); } Ok(projection) @@ -210,21 +215,42 @@ impl MetricEngineInner { ) -> Result { let logical_columns = self .load_logical_columns(physical_region_id, logical_region_id) - .await? - .into_iter() - .map(|col| col.column_id) + .await?; + // .into_iter() + // .map(|col| col.column_schema.name) + // .collect::>(); + // let physical_metadata = self + // .mito + // .get_metadata(physical_region_id) + // .await + // .context(MitoReadOperationSnafu)?; + + // let mut logical_metadata = physical_metadata + // .project(&logical_columns) + // .context(InvalidMetadataSnafu)?; + // logical_metadata.region_id = logical_region_id; + + let primary_keys = logical_columns + .iter() + .filter_map(|col| { + if col.semantic_type == SemanticType::Tag { + Some(col.column_id) + } else { + None + } + }) .collect::>(); - let physical_metadata = self - .mito - .get_metadata(physical_region_id) - .await - .context(MitoReadOperationSnafu)?; - Ok(Arc::new( - physical_metadata - .project(&logical_columns) - .context(InvalidMetadataSnafu)?, - )) + let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id); + for col in logical_columns { + logical_metadata_builder.push_column_metadata(col); + } + logical_metadata_builder.primary_key(primary_keys); + let logical_metadata = logical_metadata_builder + .build() + .context(InvalidMetadataSnafu)?; + + Ok(Arc::new(logical_metadata)) } } @@ -257,7 +283,8 @@ mod test { .unwrap(); // add columns to the first logical region - let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]); + let alter_request = + alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]); env.metric() .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) .await @@ -277,7 +304,7 @@ mod test { .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]); + assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]); assert_eq!(scan_req.filters.len(), 1); assert_eq!( scan_req.filters[0], @@ -294,6 +321,6 @@ mod test { .transform_request(physical_region_id, logical_region_id, scan_req) .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]); + assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]); } } diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 8da2a9d1ef76..c19d58e5e999 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -34,28 +34,40 @@ impl MetricEngineInner { logical_region_id: RegionId, ) -> Result> { // load logical and physical columns, and intersect them to get logical column metadata - let logical_columns = self + let mut logical_column_metadata = self .metadata_region .logical_columns(physical_region_id, logical_region_id) .await? .into_iter() - .collect::>(); - let physical_columns = self - .data_region - .physical_columns(physical_region_id) - .await?; - let mut logical_column_metadata = physical_columns - .into_iter() - .filter_map(|mut col| { - // recover the semantic type of logical columns - logical_columns - .get(&col.column_schema.name) - .map(|semantic_type| { - col.semantic_type = *semantic_type; - col - }) - }) + .map(|(_, column_metadata)| column_metadata) .collect::>(); + // let physical_columns = self + // .data_region + // .physical_columns(physical_region_id) + // .await? + // .into_iter() + // .map(|col| (col.column_schema.name.clone(), col)) + // .collect::>(); + // let mut logical_column_metadata = physical_columns + // .into_iter() + // .filter_map(|mut col| { + // // recover the semantic type of logical columns + // logical_columns + // .get(&col.column_schema.name) + // .map(|semantic_type| { + // col.semantic_type = *semantic_type; + // col + // }) + // }) + // .collect::>(); + // let logical_column_metadata = logical_columns + // .into_iter() + // .map(|(name, semantic_type)| { + // let mut col = physical_columns.get(&name).unwrap().clone(); + // col.semantic_type = col.semantic_type; + // col + // }) + // .collect::>(); // sort columns on column id to ensure the order logical_column_metadata.sort_unstable_by_key(|col| col.column_id); diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 9abeef8566a1..2a79c1d90d81 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -63,6 +63,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to deserialize column metadata from {}", raw))] + DeserializeColumnMetadata { + raw: String, + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + #[snafu(display("Failed to decode base64 column value"))] DecodeColumnValue { #[snafu(source)] @@ -156,6 +164,7 @@ impl ErrorExt for Error { MissingInternalColumn { .. } | DeserializeSemanticType { .. } + | DeserializeColumnMetadata { .. } | DecodeColumnValue { .. } | ParseRegionId { .. } | InvalidMetadata { .. } => StatusCode::Unexpected, diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 2add36a5f56e..e8e808eed7b5 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -22,6 +22,7 @@ use common_recordbatch::util::collect; use datafusion::prelude::{col, lit}; use mito2::engine::MitoEngine; use snafu::ResultExt; +use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, @@ -32,7 +33,7 @@ use store_api::region_request::RegionPutRequest; use store_api::storage::{RegionId, ScanRequest}; use crate::error::{ - CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, + CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu, Result, }; @@ -97,7 +98,7 @@ impl MetadataRegion { physical_region_id: RegionId, logical_region_id: RegionId, column_name: &str, - semantic_type: SemanticType, + column_metadata: &ColumnMetadata, ) -> Result { let region_id = utils::to_metadata_region_id(physical_region_id); let column_key = Self::concat_column_key(logical_region_id, column_name); @@ -105,7 +106,7 @@ impl MetadataRegion { self.put_if_absent( region_id, column_key, - Self::serialize_semantic_type(semantic_type), + Self::serialize_column_metadata(column_metadata), ) .await } @@ -132,7 +133,7 @@ impl MetadataRegion { let column_key = Self::concat_column_key(logical_region_id, column_name); let semantic_type = self.get(region_id, &column_key).await?; semantic_type - .map(|s| Self::deserialize_semantic_type(&s)) + .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type)) .transpose() } @@ -143,7 +144,7 @@ impl MetadataRegion { &self, physical_region_id: RegionId, logical_region_id: RegionId, - ) -> Result> { + ) -> Result> { let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let region_column_prefix = Self::concat_column_key_prefix(logical_region_id); @@ -154,8 +155,8 @@ impl MetadataRegion { } // Safety: we have checked the prefix let (_, column_name) = Self::parse_column_key(&k)?.unwrap(); - let semantic_type = Self::deserialize_semantic_type(&v)?; - columns.push((column_name, semantic_type)); + let column_metadata = Self::deserialize_column_metadata(&v)?; + columns.push((column_name, column_metadata)); } Ok(columns) @@ -228,13 +229,14 @@ impl MetadataRegion { } } - pub fn serialize_semantic_type(semantic_type: SemanticType) -> String { - serde_json::to_string(&semantic_type).unwrap() + pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String { + serde_json::to_string(column_metadata).unwrap() } - pub fn deserialize_semantic_type(semantic_type: &str) -> Result { - serde_json::from_str(semantic_type) - .with_context(|_| DeserializeSemanticTypeSnafu { raw: semantic_type }) + pub fn deserialize_column_metadata(column_metadata: &str) -> Result { + serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu { + raw: column_metadata, + }) } } @@ -411,6 +413,8 @@ impl MetadataRegion { #[cfg(test)] mod test { + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; use store_api::region_request::RegionRequest; use super::*; @@ -463,26 +467,21 @@ mod test { } #[test] - fn test_serialize_semantic_type() { + fn test_serialize_column_metadata() { let semantic_type = SemanticType::Tag; - let expected = "\"Tag\"".to_string(); - assert_eq!( - MetadataRegion::serialize_semantic_type(semantic_type), - expected - ); - } - - #[test] - fn test_deserialize_semantic_type() { - let semantic_type = "\"Tag\""; - let expected = SemanticType::Tag; + let column_metadata = ColumnMetadata { + column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false), + semantic_type, + column_id: 5, + }; + let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string(); assert_eq!( - MetadataRegion::deserialize_semantic_type(semantic_type).unwrap(), + MetadataRegion::serialize_column_metadata(&column_metadata), expected ); - let semantic_type = "\"InvalidType\""; - assert!(MetadataRegion::deserialize_semantic_type(semantic_type).is_err()); + let semantic_type = "\"Invalid Column Metadata\""; + assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err()); } #[test] @@ -620,12 +619,21 @@ mod test { let logical_region_id = RegionId::new(868, 8390); let column_name = "column1"; let semantic_type = SemanticType::Tag; + let column_metadata = ColumnMetadata { + column_schema: ColumnSchema::new( + column_name, + ConcreteDataType::string_datatype(), + false, + ), + semantic_type, + column_id: 5, + }; metadata_region .add_column( physical_region_id, logical_region_id, column_name, - semantic_type, + &column_metadata, ) .await .unwrap(); @@ -641,7 +649,7 @@ mod test { physical_region_id, logical_region_id, column_name, - SemanticType::Field, + &column_metadata, ) .await .unwrap(); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 0de0f4beeac0..d36a950e85ae 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -30,7 +30,7 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{ AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest, }; -use store_api::storage::RegionId; +use store_api::storage::{ColumnId, RegionId}; use crate::data_region::DataRegion; use crate::engine::MetricEngine; @@ -156,12 +156,15 @@ impl TestEnv { } /// Generate a [RegionAlterRequest] for adding tag columns. -pub fn alter_logical_region_add_tag_columns(new_tags: &[&str]) -> RegionAlterRequest { +pub fn alter_logical_region_add_tag_columns( + col_id_start: ColumnId, + new_tags: &[&str], +) -> RegionAlterRequest { let mut new_columns = vec![]; for (i, tag) in new_tags.iter().enumerate() { new_columns.push(AddColumn { column_metadata: ColumnMetadata { - column_id: i as u32, + column_id: i as u32 + col_id_start, semantic_type: SemanticType::Tag, column_schema: ColumnSchema::new( tag.to_string(), @@ -198,7 +201,7 @@ pub fn create_logical_region_request( ), }, ColumnMetadata { - column_id: 0, + column_id: 1, semantic_type: SemanticType::Field, column_schema: ColumnSchema::new( "greptime_value", @@ -209,7 +212,7 @@ pub fn create_logical_region_request( ]; for tag in tags { column_metadatas.push(ColumnMetadata { - column_id: 0, + column_id: 2, semantic_type: SemanticType::Tag, column_schema: ColumnSchema::new( tag.to_string(), @@ -261,9 +264,10 @@ pub fn row_schema_with_tags(tags: &[&str]) -> Vec { schema } -/// Build [Rows] for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest). +/// Build [Row]s for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest). /// -/// The schema is generated by [row_schema_with_tags]. +/// The schema is generated by [row_schema_with_tags]. `num_tags` doesn't need to be precise, +/// it's used to determin the column id for new columns. pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec { let mut rows = vec![]; for i in 0..num_rows { diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 947be5ea588d..86ae565318a9 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -217,6 +217,13 @@ impl RegionMetadata { self.id_to_index.get(&column_id).copied() } + /// Find column index by name. + pub fn column_index_by_name(&self, column_name: &str) -> Option { + self.column_metadatas + .iter() + .position(|col| col.column_schema.name == column_name) + } + /// Returns the time index column /// /// # Panics @@ -272,7 +279,7 @@ impl RegionMetadata { ); // prepare new indices - let mut indices_to_preserve = projection + let indices_to_preserve = projection .iter() .map(|id| { self.column_index_by_id(*id) @@ -282,8 +289,6 @@ impl RegionMetadata { }) }) .collect::>>()?; - indices_to_preserve.sort_unstable(); - indices_to_preserve.reverse(); // project schema let projected_schema = @@ -294,31 +299,19 @@ impl RegionMetadata { projection: projection.to_vec(), })?; - // project columns - let mut projected_column_metadatas = self.column_metadatas.clone(); + // project columns, generate projected primary key and new id_to_index + let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len()); + let mut projected_primary_key = vec![]; + let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len()); for index in indices_to_preserve { - projected_column_metadatas.remove(index); + let col = self.column_metadatas[index].clone(); + if col.semantic_type == SemanticType::Tag { + projected_primary_key.push(col.column_id); + } + projected_id_to_index.insert(col.column_id, projected_column_metadatas.len()); + projected_column_metadatas.push(col); } - // generate projected primary key - let projected_primary_key = projected_column_metadatas - .iter() - .filter_map(|col| { - if col.semantic_type == SemanticType::Tag { - Some(col.column_id) - } else { - None - } - }) - .collect(); - - // generate new id_to_index - let projected_id_to_index = projected_column_metadatas - .iter() - .enumerate() - .map(|(idx, col)| (col.column_id, idx)) - .collect(); - Ok(RegionMetadata { schema: Arc::new(projected_schema), time_index: self.time_index, From 8b92ab4584ed5f158a421de915972e345aff4a1b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 24 Dec 2023 23:41:32 +0800 Subject: [PATCH 3/5] remove deadcode Signed-off-by: Ruihang Xia --- src/metric-engine/src/engine/read.rs | 13 --------- .../src/engine/region_metadata.rs | 29 +------------------ 2 files changed, 1 insertion(+), 41 deletions(-) diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index a0d89e4c102b..365214c56a3d 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -216,19 +216,6 @@ impl MetricEngineInner { let logical_columns = self .load_logical_columns(physical_region_id, logical_region_id) .await?; - // .into_iter() - // .map(|col| col.column_schema.name) - // .collect::>(); - // let physical_metadata = self - // .mito - // .get_metadata(physical_region_id) - // .await - // .context(MitoReadOperationSnafu)?; - - // let mut logical_metadata = physical_metadata - // .project(&logical_columns) - // .context(InvalidMetadataSnafu)?; - // logical_metadata.region_id = logical_region_id; let primary_keys = logical_columns .iter() diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index c19d58e5e999..5a3979f99c43 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -27,7 +27,7 @@ use crate::error::Result; impl MetricEngineInner { /// Load column metadata of a logical region. /// - /// The return value is ordered. + /// The return value is ordered on [ColumnId]. pub async fn load_logical_columns( &self, physical_region_id: RegionId, @@ -41,33 +41,6 @@ impl MetricEngineInner { .into_iter() .map(|(_, column_metadata)| column_metadata) .collect::>(); - // let physical_columns = self - // .data_region - // .physical_columns(physical_region_id) - // .await? - // .into_iter() - // .map(|col| (col.column_schema.name.clone(), col)) - // .collect::>(); - // let mut logical_column_metadata = physical_columns - // .into_iter() - // .filter_map(|mut col| { - // // recover the semantic type of logical columns - // logical_columns - // .get(&col.column_schema.name) - // .map(|semantic_type| { - // col.semantic_type = *semantic_type; - // col - // }) - // }) - // .collect::>(); - // let logical_column_metadata = logical_columns - // .into_iter() - // .map(|(name, semantic_type)| { - // let mut col = physical_columns.get(&name).unwrap().clone(); - // col.semantic_type = col.semantic_type; - // col - // }) - // .collect::>(); // sort columns on column id to ensure the order logical_column_metadata.sort_unstable_by_key(|col| col.column_id); From e7e23b7812141fabb1da2d91904b87a1343e6ce5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 24 Dec 2023 23:45:45 +0800 Subject: [PATCH 4/5] fix typo Signed-off-by: Ruihang Xia --- src/metric-engine/src/test_util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index d36a950e85ae..0d013ac7b308 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -267,7 +267,7 @@ pub fn row_schema_with_tags(tags: &[&str]) -> Vec { /// Build [Row]s for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest). /// /// The schema is generated by [row_schema_with_tags]. `num_tags` doesn't need to be precise, -/// it's used to determin the column id for new columns. +/// it's used to determine the column id for new columns. pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec { let mut rows = vec![]; for i in 0..num_rows { From 527d76dfe3f9a03cca208c0be9eb9b6281d7ddd8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 25 Dec 2023 16:52:44 +0800 Subject: [PATCH 5/5] remove redundent column name Signed-off-by: Ruihang Xia --- src/metric-engine/src/engine/alter.rs | 7 +------ src/metric-engine/src/engine/create.rs | 14 ++------------ src/metric-engine/src/metadata_region.rs | 18 ++++-------------- 3 files changed, 7 insertions(+), 32 deletions(-) diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index e2f2153d77ac..aa8e37d8a65c 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -93,12 +93,7 @@ impl MetricEngineInner { // register columns to logical region for col in columns { self.metadata_region - .add_column( - metadata_region_id, - region_id, - &col.column_metadata.column_schema.name, - &col.column_metadata, - ) + .add_column(metadata_region_id, region_id, &col.column_metadata) .await?; } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 4281161081b9..f9488ac70e2a 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -185,12 +185,7 @@ impl MetricEngineInner { .await?; for col in &request.column_metadatas { self.metadata_region - .add_column( - metadata_region_id, - logical_region_id, - &col.column_schema.name, - col, - ) + .add_column(metadata_region_id, logical_region_id, col) .await?; } @@ -221,12 +216,7 @@ impl MetricEngineInner { // register columns to metadata region for col in &new_columns { self.metadata_region - .add_column( - metadata_region_id, - logical_region_id, - &col.column_schema.name, - col, - ) + .add_column(metadata_region_id, logical_region_id, col) .await?; } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index e8e808eed7b5..0621b9a3fd17 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -97,11 +97,11 @@ impl MetadataRegion { &self, physical_region_id: RegionId, logical_region_id: RegionId, - column_name: &str, column_metadata: &ColumnMetadata, ) -> Result { let region_id = utils::to_metadata_region_id(physical_region_id); - let column_key = Self::concat_column_key(logical_region_id, column_name); + let column_key = + Self::concat_column_key(logical_region_id, &column_metadata.column_schema.name); self.put_if_absent( region_id, @@ -629,12 +629,7 @@ mod test { column_id: 5, }; metadata_region - .add_column( - physical_region_id, - logical_region_id, - column_name, - &column_metadata, - ) + .add_column(physical_region_id, logical_region_id, &column_metadata) .await .unwrap(); let actual_semantic_type = metadata_region @@ -645,12 +640,7 @@ mod test { // duplicate column won't be updated let is_updated = metadata_region - .add_column( - physical_region_id, - logical_region_id, - column_name, - &column_metadata, - ) + .add_column(physical_region_id, logical_region_id, &column_metadata) .await .unwrap(); assert!(!is_updated);