diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1d6095e2777f..c35db88e79ef 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,17 +14,17 @@ //! Scans a region according to the scan request. -use common_recordbatch::{SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadata; -use store_api::storage::{ColumnId, ScanRequest}; +use snafu::ResultExt; +use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; -use crate::error::{BuildPredicateSnafu, InvalidRequestSnafu, Result}; +use crate::error::{BuildPredicateSnafu, Result}; use crate::read::seq_scan::SeqScan; +use crate::read::stream::ProjectionMapper; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -110,19 +110,17 @@ impl ScanRegion { self.version.metadata.schema.clone(), ) .context(BuildPredicateSnafu)?; - let projection = self - .request - .projection - .as_ref() - .map(|p| projection_indices_to_ids(&self.version.metadata, p)) - .transpose()?; + let mapper = match &self.request.projection { + Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, + None => ProjectionMapper::all(&self.version.metadata)?, + }; let seq_scan = SeqScan::new( self.version.metadata.clone(), &self.file_dir, self.object_store.clone(), + mapper, ) - .with_projection(projection) .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) @@ -155,24 +153,3 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); file_ts_range.intersects(predicate) } - -/// Map projection indices to column ids. -fn projection_indices_to_ids( - metadata: &RegionMetadata, - projection: &[usize], -) -> Result> { - let mut column_ids = Vec::with_capacity(projection.len()); - // For each projection index, we get the column id. - for idx in projection { - let column_id = metadata - .column_metadatas - .get(*idx) - .context(InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!("Index {} out of bound", idx), - })? - .column_id; - column_ids.push(column_id); - } - Ok(column_ids) -} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index c2bea72080d3..0f40af2fbefb 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -18,10 +18,10 @@ use common_recordbatch::SendableRecordBatchStream; use common_time::range::TimestampRange; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::memtable::MemtableRef; +use crate::read::stream::ProjectionMapper; use crate::sst::file::FileHandle; /// Scans a region and returns rows in a sorted sequence. @@ -34,9 +34,9 @@ pub struct SeqScan { file_dir: String, /// Object store that stores SST files. object_store: ObjectStore, + /// Maps projected Batches to RecordBatches. + mapper: ProjectionMapper, - /// Projection to push down. - projection: Option>, /// Time range filter for time index. time_range: Option, /// Predicate to push down. @@ -50,12 +50,17 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. #[must_use] - pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan { + pub(crate) fn new( + metadata: RegionMetadataRef, + file_dir: &str, + object_store: ObjectStore, + mapper: ProjectionMapper, + ) -> SeqScan { SeqScan { metadata, file_dir: file_dir.to_string(), object_store, - projection: None, + mapper, time_range: None, predicate: None, memtables: Vec::new(), @@ -63,37 +68,30 @@ impl SeqScan { } } - /// Set projection. - #[must_use] - pub fn with_projection(mut self, projection: Option>) -> Self { - self.projection = projection; - self - } - /// Set time range filter for time index. #[must_use] - pub fn with_time_range(mut self, time_range: Option) -> Self { + pub(crate) fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } /// Set predicate to push down. #[must_use] - pub fn with_predicate(mut self, predicate: Option) -> Self { + pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } /// Set memtables to read. #[must_use] - pub fn with_memtables(mut self, memtables: Vec) -> Self { + pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } /// Set files to read. #[must_use] - pub fn with_files(mut self, files: Vec) -> Self { + pub(crate) fn with_files(mut self, files: Vec) -> Self { self.files = files; self } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 2a33471dbff4..26eb0a4d9b3f 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -14,42 +14,38 @@ //! Record batch stream. -use std::sync::Arc; use std::pin::Pin; -use std::task::Context; -use std::task::Poll; +use std::sync::Arc; +use std::task::{Context, Poll}; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_recordbatch::RecordBatchStream; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use datatypes::prelude::{ConcreteDataType, DataType}; -use datatypes::schema::{SchemaRef, Schema}; +use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use futures::Stream; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; + +use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; -use crate::row_converter::{McmpRowCodec, SortField, RowCodec}; -use crate::error::Result; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Record batch stream implementation. pub(crate) struct StreamImpl { /// [Batch] stream. stream: S, /// Converts [Batch]es from the `stream` to [RecordBatch]. - converter: BatchConverter, + mapper: ProjectionMapper, } impl StreamImpl { /// Returns a new stream from a batch stream. - pub(crate) fn new(stream: S, converter: BatchConverter) -> StreamImpl { - StreamImpl { - stream, - converter, - } + pub(crate) fn new(stream: S, mapper: ProjectionMapper) -> StreamImpl { + StreamImpl { stream, mapper } } } @@ -59,11 +55,12 @@ impl> + Unpin> Stream for StreamImpl { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(Some(res)) => { - let record_batch = res.map_err(BoxedError::new).context(ExternalSnafu).and_then(|batch| { - self.converter.convert(&batch) - }); + let record_batch = res + .map_err(BoxedError::new) + .context(ExternalSnafu) + .and_then(|batch| self.mapper.convert(&batch)); Poll::Ready(Some(record_batch)) - }, + } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } @@ -72,12 +69,12 @@ impl> + Unpin> Stream for StreamImpl { impl> + Unpin> RecordBatchStream for StreamImpl { fn schema(&self) -> SchemaRef { - self.converter.output_schema.clone() + self.mapper.output_schema.clone() } } -/// Converts a [Batch] to a [RecordBatch]. -pub(crate) struct BatchConverter { +/// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. +pub(crate) struct ProjectionMapper { /// Maps column in [RecordBatch] to index in [Batch]. batch_indices: Vec, /// Decoder for primary key. @@ -86,17 +83,23 @@ pub(crate) struct BatchConverter { output_schema: SchemaRef, } -impl BatchConverter { - /// Returns a new converter with projection. - /// - /// # Panics - /// Panics if any index in `projection` is out of bound. - pub(crate) fn new(metadata: &RegionMetadata, projection: impl Iterator) -> BatchConverter { +impl ProjectionMapper { + /// Returns a new mapper with projection. + pub(crate) fn new( + metadata: &RegionMetadata, + projection: impl Iterator, + ) -> Result { let mut batch_indices = Vec::with_capacity(projection.size_hint().0); let mut column_schemas = Vec::with_capacity(projection.size_hint().0); for idx in projection { // For each projection index, we get the column id for projection. - let column = &metadata.column_metadatas[idx]; + let column = metadata + .column_metadatas + .get(idx) + .context(InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("projection index {} is out of bound", idx), + })?; // Get column index in a batch by its semantic type and column id. let batch_index = match column.semantic_type { @@ -104,7 +107,7 @@ impl BatchConverter { // Safety: It is a primary key column. let index = metadata.primary_key_index(column.column_id).unwrap(); BatchIndex::Tag(index) - }, + } SemanticType::Timestamp => BatchIndex::Timestamp, SemanticType::Field => { // Safety: It is a field column. @@ -114,6 +117,7 @@ impl BatchConverter { }; batch_indices.push(batch_index); + // Safety: idx is valid. column_schemas.push(metadata.schema.column_schemas()[idx].clone()); } @@ -126,39 +130,47 @@ impl BatchConverter { // Safety: Columns come from existing schema. let output_schema = Arc::new(Schema::new(column_schemas)); - BatchConverter { + Ok(ProjectionMapper { batch_indices, codec, output_schema, - } + }) } - /// Returns a new converter without projection. - pub(crate) fn all(metadata: &RegionMetadata) -> BatchConverter { - BatchConverter::new(metadata, 0..metadata.column_metadatas.len()) + /// Returns a new mapper without projection. + pub(crate) fn all(metadata: &RegionMetadata) -> Result { + ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) } /// Converts a [Batch] to a [RecordBatch]. /// - /// The batch must match the `projection` using to build the converter. + /// The batch must match the `projection` using to build the mapper. pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { - let pk_values = self.codec.decode(batch.primary_key()).map_err(BoxedError::new).context(ExternalSnafu)?; + let pk_values = self + .codec + .decode(batch.primary_key()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; let mut columns = Vec::with_capacity(self.output_schema.num_columns()); let num_rows = batch.num_rows(); - for (index, column_schema) in self.batch_indices.iter().zip(self.output_schema.column_schemas()) { + for (index, column_schema) in self + .batch_indices + .iter() + .zip(self.output_schema.column_schemas()) + { match index { BatchIndex::Tag(idx) => { let value = pk_values[*idx].as_value_ref(); let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?; columns.push(vector); - }, + } BatchIndex::Timestamp => { columns.push(batch.timestamps().clone()); - }, + } BatchIndex::Field(idx) => { columns.push(batch.fields()[*idx].data.clone()); - }, + } } } @@ -178,9 +190,16 @@ enum BatchIndex { } /// Returns a vector with repeated values. -fn new_repeated_vector(data_type: &ConcreteDataType, value: ValueRef, num_rows: usize) -> common_recordbatch::error::Result { +fn new_repeated_vector( + data_type: &ConcreteDataType, + value: ValueRef, + num_rows: usize, +) -> common_recordbatch::error::Result { let mut mutable_vector = data_type.create_mutable_vector(1); - mutable_vector.try_push_value_ref(value).map_err(BoxedError::new).context(ExternalSnafu)?; + mutable_vector + .try_push_value_ref(value) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; // This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 7d298557bd85..17aadef89685 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -195,7 +195,8 @@ impl RegionMetadata { /// /// This does a linear search. pub fn field_index(&self, column_id: ColumnId) -> Option { - self.field_columns().position(|column| column.column_id == column_id) + self.field_columns() + .position(|column| column.column_id == column_id) } /// Checks whether the metadata is valid.