diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index e10ce4a7105b..6d35aaefb579 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -22,6 +22,7 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{ AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, }; +use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; use crate::error::{ @@ -73,7 +74,13 @@ impl DataRegion { let new_column_id_start = 1 + region_metadata .column_metadatas .iter() - .map(|c| c.column_id) + .filter_map(|c| { + if ReservedColumnId::is_reserved(c.column_id) { + None + } else { + Some(c.column_id) + } + }) .max() .unwrap_or(0); @@ -137,6 +144,19 @@ impl DataRegion { .await .context(MitoWriteOperationSnafu) } + + pub async fn physical_columns( + &self, + physical_region_id: RegionId, + ) -> Result> { + let data_region_id = utils::to_data_region_id(physical_region_id); + let metadata = self + .mito + .get_metadata(data_region_id) + .await + .context(MitoReadOperationSnafu)?; + Ok(metadata.column_metadatas.clone()) + } } #[cfg(test)] diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 859c6cf0471e..7888f75be8dd 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -15,6 +15,8 @@ mod alter; mod create; mod put; +mod read; +mod region_metadata; mod state; use std::sync::Arc; @@ -74,7 +76,7 @@ use crate::metadata_region::MetadataRegion; /// they support are different. List below: /// /// | Operations | Logical Region | Physical Region | -/// | :--------: | :------------: | :-------------: | +/// | ---------- | -------------- | --------------- | /// | Create | ✅ | ✅ | /// | Drop | ✅ | ❌ | /// | Write | ✅ | ❌ | @@ -113,7 +115,7 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> std::result::Result { + ) -> Result { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), @@ -143,15 +145,15 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, request: ScanRequest, - ) -> std::result::Result { - todo!() + ) -> Result { + self.inner + .read_region(region_id, request) + .await + .map_err(BoxedError::new) } /// Retrieves region's metadata. - async fn get_metadata( - &self, - region_id: RegionId, - ) -> std::result::Result { + async fn get_metadata(&self, region_id: RegionId) -> Result { todo!() } @@ -161,15 +163,11 @@ impl RegionEngine for MetricEngine { } /// Stops the engine - async fn stop(&self) -> std::result::Result<(), BoxedError> { + async fn stop(&self) -> Result<(), BoxedError> { todo!() } - fn set_writable( - &self, - region_id: RegionId, - writable: bool, - ) -> std::result::Result<(), BoxedError> { + fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { todo!() } diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 254a6448c5e4..74ea3921f6b8 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -49,7 +49,7 @@ impl MetricEngineInner { ) -> Result<()> { let physical_region_id = { let state = &self.state.read().await; - *state.logical_regions().get(®ion_id).with_context(|| { + state.get_physical_region_id(region_id).with_context(|| { error!("Trying to alter an nonexistent region {region_id}"); LogicalRegionNotFoundSnafu { region_id } })? diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index dd1347e705b2..c5520cd84f01 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -212,8 +212,10 @@ impl MetricEngineInner { #[cfg(test)] mod tests { + use common_recordbatch::RecordBatches; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; + use store_api::storage::ScanRequest; use super::*; use crate::test_util::{self, TestEnv}; @@ -222,6 +224,72 @@ mod tests { async fn test_write_logical_region() { let env = TestEnv::new().await; env.init_metric_region().await; + + // prepare data + let schema = test_util::row_schema_with_tags(&["job"]); + let rows = test_util::build_rows(1, 5); + let request = RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + }); + + // write data + let logical_region_id = env.default_logical_region_id(); + let Output::AffectedRows(count) = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap() + else { + panic!() + }; + assert_eq!(count, 5); + + // read data from physical region + let physical_region_id = env.default_physical_region_id(); + let request = ScanRequest::default(); + let stream = env + .metric() + .handle_query(physical_region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------------------------+----------------+------------+---------------------+-------+ +| greptime_timestamp | greptime_value | __table_id | __tsid | job | ++-------------------------+----------------+------------+---------------------+-------+ +| 1970-01-01T00:00:00 | 0.0 | 3 | 4844750677434873907 | tag_0 | +| 1970-01-01T00:00:00.001 | 1.0 | 3 | 4844750677434873907 | tag_0 | +| 1970-01-01T00:00:00.002 | 2.0 | 3 | 4844750677434873907 | tag_0 | +| 1970-01-01T00:00:00.003 | 3.0 | 3 | 4844750677434873907 | tag_0 | +| 1970-01-01T00:00:00.004 | 4.0 | 3 | 4844750677434873907 | tag_0 | ++-------------------------+----------------+------------+---------------------+-------+"; + assert_eq!(expected, batches.pretty_print().unwrap(), "physical region"); + + // read data from logical region + let request = ScanRequest::default(); + let stream = env + .metric() + .handle_query(logical_region_id, request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------------------------+----------------+-------+ +| greptime_timestamp | greptime_value | job | ++-------------------------+----------------+-------+ +| 1970-01-01T00:00:00 | 0.0 | tag_0 | +| 1970-01-01T00:00:00.001 | 1.0 | tag_0 | +| 1970-01-01T00:00:00.002 | 2.0 | tag_0 | +| 1970-01-01T00:00:00.003 | 3.0 | tag_0 | +| 1970-01-01T00:00:00.004 | 4.0 | tag_0 | ++-------------------------+----------------+-------+"; + assert_eq!(expected, batches.pretty_print().unwrap(), "logical region"); + } + + #[tokio::test] + async fn test_write_logical_region_row_count() { + let env = TestEnv::new().await; + env.init_metric_region().await; let engine = env.metric(); // add columns diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs new file mode 100644 index 000000000000..7d7026ba9d28 --- /dev/null +++ b/src/metric-engine/src/engine/read.rs @@ -0,0 +1,247 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::logical_plan::Expr; +use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::{error, info, tracing}; +use datafusion::logical_expr; +use snafu::{OptionExt, ResultExt}; +use store_api::region_engine::RegionEngine; +use store_api::storage::consts::ReservedColumnId; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; +use crate::engine::MetricEngineInner; +use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result}; +use crate::utils; + +impl MetricEngineInner { + #[tracing::instrument(skip_all)] + pub async fn read_region( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + let is_reading_physical_region = self + .state + .read() + .await + .physical_regions() + .contains_key(®ion_id); + + if is_reading_physical_region { + info!( + "Metric region received read request {request:?} on physical region {region_id:?}" + ); + self.read_physical_region(region_id, request).await + } else { + self.read_logical_region(region_id, request).await + } + } + + /// Proxy the read request to underlying physical region (mito engine). + async fn read_physical_region( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + self.mito + .handle_query(region_id, request) + .await + .context(MitoReadOperationSnafu) + } + + async fn read_logical_region( + &self, + logical_region_id: RegionId, + request: ScanRequest, + ) -> Result { + let physical_region_id = { + let state = &self.state.read().await; + state + .get_physical_region_id(logical_region_id) + .with_context(|| { + error!("Trying to read an nonexistent region {logical_region_id}"); + LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + } + })? + }; + let data_region_id = utils::to_data_region_id(physical_region_id); + let request = self + .transform_request(physical_region_id, logical_region_id, request) + .await?; + self.mito + .handle_query(data_region_id, request) + .await + .context(MitoReadOperationSnafu) + } + + /// Transform the [ScanRequest] from logical region to physical data region. + async fn transform_request( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + mut request: ScanRequest, + ) -> Result { + // transform projection + let physical_projection = if let Some(projection) = &request.projection { + self.transform_projection(physical_region_id, logical_region_id, projection) + .await? + } else { + self.default_projection(physical_region_id, logical_region_id) + .await? + }; + request.projection = Some(physical_projection); + + // add table filter + request + .filters + .push(self.table_id_filter(logical_region_id)); + + Ok(request) + } + + /// Generate a filter on the table id column. + fn table_id_filter(&self, logical_region_id: RegionId) -> Expr { + logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .eq(logical_expr::lit(logical_region_id.table_id())) + .into() + } + + /// Transform the projection from logical region to physical region. + /// + /// This method will not preserve internal columns. + pub async fn transform_projection( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + origin_projection: &[usize], + ) -> Result> { + // project on logical columns + let logical_columns = self + .load_logical_columns(physical_region_id, logical_region_id) + .await?; + + // generate physical projection + let mut physical_projection = Vec::with_capacity(origin_projection.len()); + let data_region_id = utils::to_data_region_id(physical_region_id); + let physical_metadata = self + .mito + .get_metadata(data_region_id) + .await + .context(MitoReadOperationSnafu)?; + for logical_proj in origin_projection { + let column_id = logical_columns[*logical_proj].column_id; + // Safety: logical columns is a strict subset of physical columns + physical_projection.push(physical_metadata.column_index_by_id(column_id).unwrap()); + } + + Ok(physical_projection) + } + + /// Default projection for a logical region. Includes non-internal columns + pub async fn default_projection( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result> { + let logical_columns = self + .load_logical_columns(physical_region_id, logical_region_id) + .await?; + let mut projection = Vec::with_capacity(logical_columns.len()); + let data_region_id = utils::to_data_region_id(physical_region_id); + let physical_metadata = self + .mito + .get_metadata(data_region_id) + .await + .context(MitoReadOperationSnafu)?; + for logical_col in logical_columns { + let column_id = logical_col.column_id; + // Safety: logical columns is a strict subset of physical columns + projection.push(physical_metadata.column_index_by_id(column_id).unwrap()); + } + Ok(projection) + } +} + +#[cfg(test)] +mod test { + use store_api::region_request::RegionRequest; + + use super::*; + use crate::engine::alter; + use crate::test_util::{ + alter_logical_region_add_tag_columns, create_logical_region_request, TestEnv, + }; + + #[tokio::test] + async fn test_transform_scan_req() { + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + let physical_region_id = env.default_physical_region_id(); + let data_region_id = utils::to_data_region_id(physical_region_id); + + // create another logical region + let logical_region_id2 = RegionId::new(1112345678, 999); + let create_request = + create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla"); + env.metric() + .handle_request(logical_region_id2, RegionRequest::Create(create_request)) + .await + .unwrap(); + + // add columns to the first logical region + let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]); + env.metric() + .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) + .await + .unwrap(); + + // check explicit projection + let mut scan_req = ScanRequest { + projection: Some(vec![0, 1, 2, 3, 4, 5, 6]), + filters: vec![], + ..Default::default() + }; + + let scan_req = env + .metric() + .inner + .transform_request(physical_region_id, logical_region_id, scan_req) + .await + .unwrap(); + + assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]); + assert_eq!(scan_req.filters.len(), 1); + assert_eq!( + scan_req.filters[0], + logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .eq(logical_expr::lit(logical_region_id.table_id())) + .into() + ); + + // check default projection + let mut scan_req = ScanRequest::default(); + let scan_req = env + .metric() + .inner + .transform_request(physical_region_id, logical_region_id, scan_req) + .await + .unwrap(); + assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]); + } +} diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs new file mode 100644 index 000000000000..8da2a9d1ef76 --- /dev/null +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of retrieving logical region's region metadata. + +use std::collections::{HashMap, HashSet}; + +use api::v1::SemanticType; +use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::storage::consts::ReservedColumnId; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::Result; + +impl MetricEngineInner { + /// Load column metadata of a logical region. + /// + /// The return value is ordered. + pub async fn load_logical_columns( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result> { + // load logical and physical columns, and intersect them to get logical column metadata + let logical_columns = self + .metadata_region + .logical_columns(physical_region_id, logical_region_id) + .await? + .into_iter() + .collect::>(); + let physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + let mut logical_column_metadata = physical_columns + .into_iter() + .filter_map(|mut col| { + // recover the semantic type of logical columns + logical_columns + .get(&col.column_schema.name) + .map(|semantic_type| { + col.semantic_type = *semantic_type; + col + }) + }) + .collect::>(); + + // sort columns on column id to ensure the order + logical_column_metadata.sort_unstable_by_key(|col| col.column_id); + + Ok(logical_column_metadata) + } +} diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 23f966128a9a..0156cc0e57f1 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Internal states of metric engine + use std::collections::{HashMap, HashSet}; use store_api::storage::RegionId; @@ -75,6 +77,10 @@ impl MetricEngineState { .insert(logical_region_id, physical_region_id); } + pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option { + self.logical_regions.get(&logical_region_id).copied() + } + pub fn physical_columns(&self) -> &HashMap> { &self.physical_columns } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 87b519066932..28098b4e5f5d 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use base64::engine::general_purpose::STANDARD_NO_PAD; @@ -25,8 +27,9 @@ use store_api::region_request::RegionPutRequest; use store_api::storage::{RegionId, ScanRequest}; use crate::consts::{ - METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, - METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, + METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, + METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, + METADATA_SCHEMA_VALUE_COLUMN_NAME, }; use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, @@ -129,6 +132,31 @@ impl MetadataRegion { .map(|s| Self::deserialize_semantic_type(&s)) .transpose() } + + // TODO(ruihang): avoid using `get_all` + /// Get all the columns of a given logical region. + /// Return a list of (column_name, semantic_type). + pub async fn logical_columns( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result> { + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + let region_column_prefix = Self::concat_column_key_prefix(logical_region_id); + + let mut columns = vec![]; + for (k, v) in self.get_all(metadata_region_id).await? { + if !k.starts_with(®ion_column_prefix) { + continue; + } + // Safety: we have checked the prefix + let (_, column_name) = Self::parse_column_key(&k)?.unwrap(); + let semantic_type = Self::deserialize_semantic_type(&v)?; + columns.push((column_name, semantic_type)); + } + + Ok(columns) + } } // utils to concat and parse key/value @@ -143,6 +171,11 @@ impl MetadataRegion { format!("__column_{}_{}", region_id.as_u64(), encoded_column_name) } + /// Concat a column key prefix without column name + pub fn concat_column_key_prefix(region_id: RegionId) -> String { + format!("__column_{}_", region_id.as_u64()) + } + #[allow(dead_code)] pub fn parse_region_key(key: &str) -> Option<&str> { key.strip_prefix("__region_") @@ -254,6 +287,47 @@ impl MetadataRegion { Ok(val) } + /// Load all metadata from a given region. + pub async fn get_all(&self, region_id: RegionId) -> Result> { + let scan_req = ScanRequest { + projection: Some(vec![ + METADATA_SCHEMA_KEY_COLUMN_INDEX, + METADATA_SCHEMA_VALUE_COLUMN_INDEX, + ]), + filters: vec![], + output_ordering: None, + limit: None, + }; + let record_batch_stream = self + .mito + .handle_query(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + let scan_result = collect(record_batch_stream) + .await + .context(CollectRecordBatchStreamSnafu)?; + + let mut result = HashMap::new(); + for batch in scan_result { + let key_col = batch.column(0); + let val_col = batch.column(1); + for row_index in 0..batch.num_rows() { + let key = key_col + .get_ref(row_index) + .as_string() + .unwrap() + .map(|s| s.to_string()); + let val = val_col + .get_ref(row_index) + .as_string() + .unwrap() + .map(|s| s.to_string()); + result.insert(key.unwrap(), val.unwrap_or_default()); + } + } + Ok(result) + } + /// Builds a [ScanRequest] to read metadata for a given key. /// The request will contains a EQ filter on the key column. /// diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index f9d6e94ac929..f71fff63719d 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -119,46 +119,11 @@ impl TestEnv { // create logical region let region_id = self.default_logical_region_id(); - let region_create_request = RegionCreateRequest { - engine: METRIC_ENGINE_NAME.to_string(), - column_metadatas: vec![ - ColumnMetadata { - column_id: 0, - semantic_type: SemanticType::Timestamp, - column_schema: ColumnSchema::new( - "greptime_timestamp", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - }, - ColumnMetadata { - column_id: 1, - semantic_type: SemanticType::Field, - column_schema: ColumnSchema::new( - "greptime_value", - ConcreteDataType::float64_datatype(), - false, - ), - }, - ColumnMetadata { - column_id: 2, - semantic_type: SemanticType::Tag, - column_schema: ColumnSchema::new( - "job", - ConcreteDataType::string_datatype(), - false, - ), - }, - ], - primary_key: vec![2], - options: [( - LOGICAL_TABLE_METADATA_KEY.to_string(), - self.default_physical_region_id().as_u64().to_string(), - )] - .into_iter() - .collect(), - region_dir: "test_metric_region_logical".to_string(), - }; + let region_create_request = create_logical_region_request( + &["job"], + self.default_physical_region_id(), + "test_metric_logical_region", + ); self.metric() .handle_request(region_id, RegionRequest::Create(region_create_request)) .await @@ -209,6 +174,58 @@ pub fn alter_logical_region_add_tag_columns(new_tags: &[&str]) -> RegionAlterReq } } +/// Generate a [RegionCreateRequest] for logical region. +/// Only need to specify tag column's name +pub fn create_logical_region_request( + tags: &[&str], + physical_region_id: RegionId, + region_dir: &str, +) -> RegionCreateRequest { + let mut column_metadatas = vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value", + ConcreteDataType::float64_datatype(), + false, + ), + }, + ]; + for tag in tags { + column_metadatas.push(ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + tag.to_string(), + ConcreteDataType::string_datatype(), + false, + ), + }); + } + RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas, + primary_key: vec![], + options: [( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_region_id.as_u64().to_string(), + )] + .into_iter() + .collect(), + region_dir: region_dir.to_string(), + } +} + /// Generate a row schema with given tag columns. /// /// The result will also contains default timestamp and value column at beginning. diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 1e0e13cb38ac..8236fa3435bd 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -82,6 +82,11 @@ impl ReservedColumnId { pub const fn table_id() -> ColumnId { Self::BASE | ReservedColumnType::TableId as ColumnId } + + /// Test if the column id is reserved. + pub fn is_reserved(column_id: ColumnId) -> bool { + column_id & Self::BASE != 0 + } } // ----------------------------------------------------------------------------- diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 7187309bd367..3d1c5da8cdb8 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -48,7 +48,8 @@ pub trait WriteRequest: Send { #[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { - /// Indices of columns to read, `None` to read all columns. + /// Indices of columns to read, `None` to read all columns. This indices is + /// based on table schema. pub projection: Option>, /// Filters pushed down pub filters: Vec,