diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 0d0bdd47ed48..8454f70c4223 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -31,6 +31,8 @@ pub(crate) mod listener; #[cfg(test)] mod open_test; #[cfg(test)] +mod projection_test; +#[cfg(test)] mod tests; use std::sync::Arc; diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs new file mode 100644 index 000000000000..9104f56d0890 --- /dev/null +++ b/src/mito2/src/engine/projection_test.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::value::ValueData; +use api::v1::{Row, Rows}; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{put_rows, rows_schema, CreateRequestBuilder, TestEnv}; + +/// Build rows for multiple tags and fields. +fn build_rows_multi_tags_fields( + tags: &[&str], + field_starts: &[usize], + ts_range: (usize, usize), +) -> Vec { + (ts_range.0..ts_range.1) + .enumerate() + .map(|(idx, ts)| { + let mut values = Vec::with_capacity(tags.len() + field_starts.len() + 1); + for tag in tags { + values.push(api::v1::Value { + value_data: Some(ValueData::StringValue(tag.to_string())), + }); + } + for field_start in field_starts { + values.push(api::v1::Value { + value_data: Some(ValueData::F64Value((field_start + idx) as f64)), + }); + } + values.push(api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + }); + + api::v1::Row { values } + }) + .collect() +} + +#[tokio::test] +async fn test_scan_projection() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // [tag_0, tag_1, field_0, field_1, ts] + let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows_multi_tags_fields(&["a", "b"], &[0, 10], (0, 3)), + }; + put_rows(&engine, region_id, rows).await; + + // Scans tag_1, field_1, ts + let request = ScanRequest { + sequence: None, + projection: Some(vec![1, 3, 4]), + filters: Vec::new(), + output_ordering: None, + limit: None, + }; + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_1 | field_1 | ts | ++-------+---------+---------------------+ +| b | 10.0 | 1970-01-01T00:00:00 | +| b | 11.0 | 1970-01-01T00:00:01 | +| b | 12.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 0af9fa487212..d6c19ceb8796 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -14,6 +14,7 @@ //! Utilities for projection. +use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; @@ -55,50 +56,64 @@ impl ProjectionMapper { metadata: &RegionMetadataRef, projection: impl Iterator, ) -> Result { - let projection_len = projection.size_hint().0; - let mut batch_indices = Vec::with_capacity(projection_len); - let mut column_schemas = Vec::with_capacity(projection_len); - let mut column_ids = Vec::with_capacity(projection_len); - for idx in projection { + let projection: Vec<_> = projection.collect(); + let mut column_schemas = Vec::with_capacity(projection.len()); + let mut column_ids = Vec::with_capacity(projection.len()); + for idx in &projection { // For each projection index, we get the column id for projection. let column = metadata .column_metadatas - .get(idx) + .get(*idx) .context(InvalidRequestSnafu { region_id: metadata.region_id, reason: format!("projection index {} is out of bound", idx), })?; + column_ids.push(column.column_id); + // Safety: idx is valid. + column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); + } + let codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + // Safety: Columns come from existing schema. + let output_schema = Arc::new(Schema::new(column_schemas)); + // Get fields in each batch. + let batch_fields = Batch::projected_fields(metadata, &column_ids); + + // Field column id to its index in batch. + let field_id_to_index: HashMap<_, _> = batch_fields + .iter() + .enumerate() + .map(|(index, column_id)| (*column_id, index)) + .collect(); + // For each projected column, compute its index in batches. + let mut batch_indices = Vec::with_capacity(projection.len()); + for idx in &projection { + // Safety: idx is valid. + let column = &metadata.column_metadatas[*idx]; // Get column index in a batch by its semantic type and column id. let batch_index = match column.semantic_type { SemanticType::Tag => { // Safety: It is a primary key column. let index = metadata.primary_key_index(column.column_id).unwrap(); + // We always read all primary key so the column always exists and the tag + // index is always valid. BatchIndex::Tag(index) } SemanticType::Timestamp => BatchIndex::Timestamp, SemanticType::Field => { - // Safety: It is a field column. - let index = metadata.field_index(column.column_id).unwrap(); + // Safety: It is a field column so it should be in `field_id_to_index`. + let index = field_id_to_index[&column.column_id]; BatchIndex::Field(index) } }; batch_indices.push(batch_index); - column_ids.push(column.column_id); - // Safety: idx is valid. - column_schemas.push(metadata.schema.column_schemas()[idx].clone()); } - let codec = McmpRowCodec::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); - // Safety: Columns come from existing schema. - let output_schema = Arc::new(Schema::new(column_schemas)); - let batch_fields = Batch::projected_fields(metadata, &column_ids); - Ok(ProjectionMapper { metadata: metadata.clone(), batch_indices, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 71adc7179ce8..6fbf197ab9b4 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -198,6 +198,8 @@ impl TestEnv { } /// Builder to mock a [RegionCreateRequest]. +/// +/// It builds schema like `[tag_0, tag_1, ..., field_0, field_1, ..., ts]`. pub struct CreateRequestBuilder { region_dir: String, tag_num: usize, @@ -232,7 +234,7 @@ impl CreateRequestBuilder { } pub fn field_num(mut self, value: usize) -> Self { - self.tag_num = value; + self.field_num = value; self } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 2291e2b4145f..8e9e8a060999 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -233,14 +233,6 @@ impl RegionMetadata { self.primary_key.iter().position(|id| *id == column_id) } - /// Returns a column's index in fields if it is a field column. - /// - /// This does a linear search. - pub fn field_index(&self, column_id: ColumnId) -> Option { - self.field_columns() - .position(|column| column.column_id == column_id) - } - /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name.