diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 49bfe02bf1d2..b2c88fffee9a 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -37,7 +37,7 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("External error, source: {}", source))] + #[snafu(display("External error, location: {}, source: {}", location, source))] External { location: Location, source: BoxedError, diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 0c917408471f..9c6f4593be8a 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,6 +16,7 @@ pub(crate) mod scan_region; pub mod seq_scan; +pub(crate) mod stream; use std::sync::Arc; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1bdf30fc14b6..1d6095e2777f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,9 +14,7 @@ //! Scans a region according to the scan request. -use std::collections::HashMap; - -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{SendableRecordBatchStream}; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index b1737f89a661..c2bea72080d3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -49,6 +49,7 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. + #[must_use] pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan { SeqScan { metadata, @@ -63,39 +64,47 @@ impl SeqScan { } /// Set projection. + #[must_use] pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; self } /// Set time range filter for time index. + #[must_use] pub fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } /// Set predicate to push down. + #[must_use] pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } /// Set memtables to read. + #[must_use] pub fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } /// Set files to read. + #[must_use] pub fn with_files(mut self, files: Vec) -> Self { self.files = files; self } /// Builds a stream for the query. + #[must_use] pub fn build(&self) -> SendableRecordBatchStream { // Scans all memtables and SSTs. // Builds a merge reader to merge results. + + // unimplemented!() } } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs new file mode 100644 index 000000000000..2a33471dbff4 --- /dev/null +++ b/src/mito2/src/read/stream.rs @@ -0,0 +1,187 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Record batch stream. + +use std::sync::Arc; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use api::v1::SemanticType; +use common_error::ext::BoxedError; +use common_recordbatch::RecordBatchStream; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatch}; +use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::schema::{SchemaRef, Schema}; +use datatypes::value::ValueRef; +use datatypes::vectors::VectorRef; +use futures::Stream; +use snafu::ResultExt; +use store_api::metadata::RegionMetadata; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, SortField, RowCodec}; +use crate::error::Result; + +/// Record batch stream implementation. +pub(crate) struct StreamImpl { + /// [Batch] stream. + stream: S, + /// Converts [Batch]es from the `stream` to [RecordBatch]. + converter: BatchConverter, +} + +impl StreamImpl { + /// Returns a new stream from a batch stream. + pub(crate) fn new(stream: S, converter: BatchConverter) -> StreamImpl { + StreamImpl { + stream, + converter, + } + } +} + +impl> + Unpin> Stream for StreamImpl { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(res)) => { + let record_batch = res.map_err(BoxedError::new).context(ExternalSnafu).and_then(|batch| { + self.converter.convert(&batch) + }); + Poll::Ready(Some(record_batch)) + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl> + Unpin> RecordBatchStream for StreamImpl { + fn schema(&self) -> SchemaRef { + self.converter.output_schema.clone() + } +} + +/// Converts a [Batch] to a [RecordBatch]. +pub(crate) struct BatchConverter { + /// Maps column in [RecordBatch] to index in [Batch]. + batch_indices: Vec, + /// Decoder for primary key. + codec: McmpRowCodec, + /// Schema for converted [RecordBatch]. + output_schema: SchemaRef, +} + +impl BatchConverter { + /// Returns a new converter with projection. + /// + /// # Panics + /// Panics if any index in `projection` is out of bound. + pub(crate) fn new(metadata: &RegionMetadata, projection: impl Iterator) -> BatchConverter { + let mut batch_indices = Vec::with_capacity(projection.size_hint().0); + let mut column_schemas = Vec::with_capacity(projection.size_hint().0); + for idx in projection { + // For each projection index, we get the column id for projection. + let column = &metadata.column_metadatas[idx]; + + // Get column index in a batch by its semantic type and column id. + let batch_index = match column.semantic_type { + SemanticType::Tag => { + // Safety: It is a primary key column. + let index = metadata.primary_key_index(column.column_id).unwrap(); + BatchIndex::Tag(index) + }, + SemanticType::Timestamp => BatchIndex::Timestamp, + SemanticType::Field => { + // Safety: It is a field column. + let index = metadata.field_index(column.column_id).unwrap(); + BatchIndex::Field(index) + } + }; + batch_indices.push(batch_index); + + column_schemas.push(metadata.schema.column_schemas()[idx].clone()); + } + + let codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + // Safety: Columns come from existing schema. + let output_schema = Arc::new(Schema::new(column_schemas)); + + BatchConverter { + batch_indices, + codec, + output_schema, + } + } + + /// Returns a new converter without projection. + pub(crate) fn all(metadata: &RegionMetadata) -> BatchConverter { + BatchConverter::new(metadata, 0..metadata.column_metadatas.len()) + } + + /// Converts a [Batch] to a [RecordBatch]. + /// + /// The batch must match the `projection` using to build the converter. + pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { + let pk_values = self.codec.decode(batch.primary_key()).map_err(BoxedError::new).context(ExternalSnafu)?; + + let mut columns = Vec::with_capacity(self.output_schema.num_columns()); + let num_rows = batch.num_rows(); + for (index, column_schema) in self.batch_indices.iter().zip(self.output_schema.column_schemas()) { + match index { + BatchIndex::Tag(idx) => { + let value = pk_values[*idx].as_value_ref(); + let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?; + columns.push(vector); + }, + BatchIndex::Timestamp => { + columns.push(batch.timestamps().clone()); + }, + BatchIndex::Field(idx) => { + columns.push(batch.fields()[*idx].data.clone()); + }, + } + } + + RecordBatch::new(self.output_schema.clone(), columns) + } +} + +/// Index of a vector in a [Batch]. +#[derive(Debug, Clone, Copy)] +enum BatchIndex { + /// Index in primary keys. + Tag(usize), + /// The time index column. + Timestamp, + /// Index in fields. + Field(usize), +} + +/// Returns a vector with repeated values. +fn new_repeated_vector(data_type: &ConcreteDataType, value: ValueRef, num_rows: usize) -> common_recordbatch::error::Result { + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector.try_push_value_ref(value).map_err(BoxedError::new).context(ExternalSnafu)?; + // This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type. + let base_vector = mutable_vector.to_vector(); + Ok(base_vector.replicate(&[num_rows])) +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 8b17826edfe8..977604c2607f 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -23,6 +23,8 @@ //! ```text //! field 0, field 1, ..., field N, time index, primary key, sequence, op type //! ``` +//! +//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). use std::collections::HashMap; use std::sync::Arc; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 7c5b75e869bb..7d298557bd85 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -169,6 +169,7 @@ impl RegionMetadata { .map(|index| &self.column_metadatas[index]) } + /// Returns all primary key columns. pub fn primary_key_columns(&self) -> impl Iterator { // safety: RegionMetadata::validate ensures every primary key exists. self.primary_key @@ -183,6 +184,20 @@ impl RegionMetadata { .filter(|column| column.semantic_type == SemanticType::Field) } + /// Returns a column's index in primary key if it is a primary key column. + /// + /// This does a linear search. + pub fn primary_key_index(&self, column_id: ColumnId) -> Option { + self.primary_key.iter().position(|id| *id == column_id) + } + + /// Returns a column's index in fields if it is a field column. + /// + /// This does a linear search. + pub fn field_index(&self, column_id: ColumnId) -> Option { + self.field_columns().position(|column| column.column_id == column_id) + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name.