diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 6c113562cddf..b71bad410317 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -286,6 +286,7 @@ mod test { use snafu::IntoError; use super::*; + use crate::error::Error; use crate::RecordBatches; #[tokio::test] @@ -354,20 +355,24 @@ mod test { .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))), ])); let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream); - let result = RecordBatches::try_collect(Box::pin(adapter)).await; - assert_eq!( - result.unwrap_err().to_string(), - "External error, source: Unknown", + let err = RecordBatches::try_collect(Box::pin(adapter)) + .await + .unwrap_err(); + assert!( + matches!(err, Error::External { .. }), + "unexpected err {err}" ); let failed_to_init_stream = new_future_stream(Err(error::ExternalSnafu .into_error(BoxedError::new(MockError::new(StatusCode::Internal))))); let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream); - let result = RecordBatches::try_collect(Box::pin(adapter)).await; - assert_eq!( - result.unwrap_err().to_string(), - "External error, source: Internal", + let err = RecordBatches::try_collect(Box::pin(adapter)) + .await + .unwrap_err(); + assert!( + matches!(err, Error::External { .. }), + "unexpected err {err}" ); } } diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 49bfe02bf1d2..b2c88fffee9a 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -37,7 +37,7 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("External error, source: {}", source))] + #[snafu(display("External error, location: {}, source: {}", location, source))] External { location: Location, source: BoxedError, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 60415d95e1b2..9730b4041f73 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -202,13 +202,26 @@ impl Stream for SimpleRecordBatchStream { } /// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream]. -pub struct RecordBatchStreamAdaptor { +pub struct RecordBatchStreamAdaptor { pub schema: SchemaRef, - pub stream: Pin> + Send>>, + pub stream: S, pub output_ordering: Option>, } -impl RecordBatchStream for RecordBatchStreamAdaptor { +impl RecordBatchStreamAdaptor { + /// Creates a RecordBatchStreamAdaptor without output ordering requirement. + pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor { + RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + } + } +} + +impl> + Unpin> RecordBatchStream + for RecordBatchStreamAdaptor +{ fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -218,7 +231,7 @@ impl RecordBatchStream for RecordBatchStreamAdaptor { } } -impl Stream for RecordBatchStreamAdaptor { +impl> + Unpin> Stream for RecordBatchStreamAdaptor { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 219cfd29cda4..1b9c938b72ce 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,13 +21,14 @@ use std::sync::Arc; use common_query::Output; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::region_request::RegionRequest; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; -use crate::error::{RecvSnafu, Result}; +use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; +use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; @@ -72,12 +73,19 @@ impl MitoEngine { pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } + + /// Handles the scan `request` and returns a [Scanner] for the `request`. + fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + self.inner.handle_query(region_id, request) + } } /// Inner struct of [MitoEngine]. struct EngineInner { /// Region workers group. workers: WorkerGroup, + /// Shared object store of all regions. + object_store: ObjectStore, } impl EngineInner { @@ -88,7 +96,8 @@ impl EngineInner { object_store: ObjectStore, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(config, log_store, object_store), + workers: WorkerGroup::start(config, log_store, object_store.clone()), + object_store, } } @@ -99,10 +108,29 @@ impl EngineInner { /// Handles [RequestBody] and return its executed result. async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + // We validate and then convert the `request` into an inner `RequestBody` for ease of handling. let body = RequestBody::try_from_region_request(region_id, request)?; let (request, receiver) = RegionTask::from_request(region_id, body); self.workers.submit_to_worker(request).await?; receiver.await.context(RecvSnafu)? } + + /// Handles the scan `request` and returns a [Scanner] for the `request`. + fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + // Reading a region doesn't need to go through the region worker thread. + let region = self + .workers + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + let version = region.version(); + let scan_region = ScanRegion::new( + version, + region.region_dir.clone(), + self.object_store.clone(), + request, + ); + + scan_region.scanner() + } } diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index ff3fc590c95c..2beb97874018 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Row, Rows}; +use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; @@ -246,7 +247,7 @@ fn build_rows(num_rows: usize) -> Vec { value_data: Some(ValueData::F64Value(i as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64)), + value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), }, ], }) @@ -285,3 +286,47 @@ async fn test_write_to_region() { }; assert_eq!(num_rows, rows_inserted); } + +// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests +// for series with multiple points and other cases. +#[tokio::test] +async fn test_write_query_region() { + let env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(3), + }; + engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.handle_query(region_id, request).unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 54355328e03f..1361aa170876 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -369,14 +369,24 @@ pub enum Error { #[snafu(display("Invalid flume sender, location: {}", location,))] InvalidFlumeSender { location: Location }, - #[snafu(display("Invalid scheduler state location: {}", location,))] + #[snafu(display("Invalid scheduler state, location: {}", location))] InvalidSchedulerState { location: Location }, - #[snafu(display("Failed to stop scheduler, source: {}", source))] + #[snafu(display("Failed to stop scheduler, location: {}, source: {}", location, source))] StopScheduler { source: JoinError, location: Location, }, + + #[snafu(display( + "Failed to build scan predicate, location: {}, source: {}", + location, + source + ))] + BuildPredicate { + source: table::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -437,6 +447,7 @@ impl ErrorExt for Error { InvalidFlumeSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, + BuildPredicate { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index ab058c05b267..fcb6556ebc87 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -29,6 +29,7 @@ pub mod error; pub mod manifest; #[allow(dead_code)] pub mod memtable; +#[allow(dead_code)] pub mod read; #[allow(dead_code)] mod region; diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index f769da498b53..53df94d19195 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -22,7 +22,7 @@ use crate::memtable::MemtableRef; #[derive(Debug)] pub(crate) struct MemtableVersion { /// Mutable memtable. - mutable: MemtableRef, + pub(crate) mutable: MemtableRef, /// Immutable memtables. immutables: Vec, } @@ -38,8 +38,11 @@ impl MemtableVersion { } } - /// Returns the mutable memtable. - pub(crate) fn mutable(&self) -> &MemtableRef { - &self.mutable + /// Lists mutable and immutable memtables. + pub(crate) fn list_memtables(&self) -> Vec { + let mut memtables = Vec::with_capacity(self.immutables.len() + 1); + memtables.push(self.mutable.clone()); + memtables.extend_from_slice(&self.immutables); + memtables } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 1a6a81b6f804..fb1629f61f42 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,6 +15,9 @@ //! Common structs and utilities for reading data. pub mod merge; +pub(crate) mod projection; +pub(crate) mod scan_region; +pub(crate) mod seq_scan; use std::sync::Arc; diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs new file mode 100644 index 000000000000..4f61affdc2f4 --- /dev/null +++ b/src/mito2/src/read/projection.rs @@ -0,0 +1,181 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for projection. + +use std::sync::Arc; + +use api::v1::SemanticType; +use common_error::ext::BoxedError; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::RecordBatch; +use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::value::ValueRef; +use datatypes::vectors::VectorRef; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::ColumnId; + +use crate::error::{InvalidRequestSnafu, Result}; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + +/// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. +pub(crate) struct ProjectionMapper { + /// Maps column in [RecordBatch] to index in [Batch]. + batch_indices: Vec, + /// Decoder for primary key. + codec: McmpRowCodec, + /// Schema for converted [RecordBatch]. + output_schema: SchemaRef, + /// Id of columns to project. + column_ids: Vec, +} + +impl ProjectionMapper { + /// Returns a new mapper with projection. + pub(crate) fn new( + metadata: &RegionMetadata, + projection: impl Iterator, + ) -> Result { + let projection_len = projection.size_hint().0; + let mut batch_indices = Vec::with_capacity(projection_len); + let mut column_schemas = Vec::with_capacity(projection_len); + let mut column_ids = Vec::with_capacity(projection_len); + for idx in projection { + // For each projection index, we get the column id for projection. + let column = metadata + .column_metadatas + .get(idx) + .context(InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("projection index {} is out of bound", idx), + })?; + + // Get column index in a batch by its semantic type and column id. + let batch_index = match column.semantic_type { + SemanticType::Tag => { + // Safety: It is a primary key column. + let index = metadata.primary_key_index(column.column_id).unwrap(); + BatchIndex::Tag(index) + } + SemanticType::Timestamp => BatchIndex::Timestamp, + SemanticType::Field => { + // Safety: It is a field column. + let index = metadata.field_index(column.column_id).unwrap(); + BatchIndex::Field(index) + } + }; + batch_indices.push(batch_index); + column_ids.push(column.column_id); + // Safety: idx is valid. + column_schemas.push(metadata.schema.column_schemas()[idx].clone()); + } + + let codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + // Safety: Columns come from existing schema. + let output_schema = Arc::new(Schema::new(column_schemas)); + + Ok(ProjectionMapper { + batch_indices, + codec, + output_schema, + column_ids, + }) + } + + /// Returns a new mapper without projection. + pub(crate) fn all(metadata: &RegionMetadata) -> Result { + ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) + } + + /// Returns ids of projected columns. + pub(crate) fn column_ids(&self) -> &[ColumnId] { + &self.column_ids + } + + /// Returns the schema of converted [RecordBatch]. + pub(crate) fn output_schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + /// Converts a [Batch] to a [RecordBatch]. + /// + /// The batch must match the `projection` using to build the mapper. + pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { + let pk_values = self + .codec + .decode(batch.primary_key()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let mut columns = Vec::with_capacity(self.output_schema.num_columns()); + let num_rows = batch.num_rows(); + for (index, column_schema) in self + .batch_indices + .iter() + .zip(self.output_schema.column_schemas()) + { + match index { + BatchIndex::Tag(idx) => { + let value = pk_values[*idx].as_value_ref(); + let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?; + columns.push(vector); + } + BatchIndex::Timestamp => { + columns.push(batch.timestamps().clone()); + } + BatchIndex::Field(idx) => { + columns.push(batch.fields()[*idx].data.clone()); + } + } + } + + RecordBatch::new(self.output_schema.clone(), columns) + } +} + +/// Index of a vector in a [Batch]. +#[derive(Debug, Clone, Copy)] +enum BatchIndex { + /// Index in primary keys. + Tag(usize), + /// The time index column. + Timestamp, + /// Index in fields. + Field(usize), +} + +/// Returns a vector with repeated values. +fn new_repeated_vector( + data_type: &ConcreteDataType, + value: ValueRef, + num_rows: usize, +) -> common_recordbatch::error::Result { + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector + .try_push_value_ref(value) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + // This requires an additional allocation. + // TODO(yingwen): Add a way to create repeated vector to data type. + let base_vector = mutable_vector.to_vector(); + Ok(base_vector.replicate(&[num_rows])) +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs new file mode 100644 index 000000000000..db6414bf8dd6 --- /dev/null +++ b/src/mito2/src/read/scan_region.rs @@ -0,0 +1,187 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Scans a region according to the scan request. + +use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; +use common_time::range::TimestampRange; +use object_store::ObjectStore; +use snafu::ResultExt; +use store_api::storage::ScanRequest; +use table::predicate::{Predicate, TimeRangePredicateBuilder}; + +use crate::error::{BuildPredicateSnafu, Result}; +use crate::read::projection::ProjectionMapper; +use crate::read::seq_scan::SeqScan; +use crate::region::version::VersionRef; +use crate::sst::file::FileHandle; + +/// A scanner scans a region and returns a [SendableRecordBatchStream]. +pub(crate) enum Scanner { + /// Sequential scan. + Seq(SeqScan), + // TODO(yingwen): Support windowed scan and chained scan. +} + +impl Scanner { + /// Returns a [SendableRecordBatchStream] to retrieve scan results. + pub(crate) async fn scan(&self) -> Result { + match self { + Scanner::Seq(seq_scan) => seq_scan.build().await, + } + } +} + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// Helper to scans a region by [ScanRequest]. +/// +/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It +/// creates a [Scanner] to actually scan these targets in [Scanner::scan()]. +/// +/// ```mermaid +/// classDiagram +/// class ScanRegion { +/// -VersionRef version +/// -ScanRequest request +/// ~scanner() Scanner +/// ~seq_scan() SeqScan +/// } +/// class Scanner { +/// <> +/// SeqScan +/// +scan() SendableRecordBatchStream +/// } +/// class SeqScan { +/// -ProjectionMapper mapper +/// -Option~TimeRange~ time_range +/// -Option~Predicate~ predicate +/// -Vec~MemtableRef~ memtables +/// -Vec~FileHandle~ files +/// +build() SendableRecordBatchStream +/// } +/// class ProjectionMapper { +/// ~output_schema() SchemaRef +/// ~convert(Batch) RecordBatch +/// } +/// ScanRegion -- Scanner +/// ScanRegion o-- ScanRequest +/// Scanner o-- SeqScan +/// Scanner -- SendableRecordBatchStream +/// SeqScan o-- ProjectionMapper +/// SeqScan -- SendableRecordBatchStream +/// ``` +pub(crate) struct ScanRegion { + /// Version of the region at scan. + version: VersionRef, + /// Directory of SST files. + file_dir: String, + /// Object store that stores SST files. + object_store: ObjectStore, + /// Scan request. + request: ScanRequest, +} + +impl ScanRegion { + /// Creates a [ScanRegion]. + pub(crate) fn new( + version: VersionRef, + file_dir: String, + object_store: ObjectStore, + request: ScanRequest, + ) -> ScanRegion { + ScanRegion { + version, + file_dir, + object_store, + request, + } + } + + /// Returns a [Scanner] to scan the region. + pub(crate) fn scanner(self) -> Result { + self.seq_scan().map(Scanner::Seq) + } + + /// Scan sequentially. + pub(crate) fn seq_scan(self) -> Result { + let time_range = self.build_time_range_predicate(); + + let ssts = &self.version.ssts; + let mut total_ssts = 0; + let mut files = Vec::new(); + for level in ssts.levels() { + total_ssts += level.files.len(); + + for file in level.files.values() { + // Finds SST files in range. + if file_in_range(file, &time_range) { + files.push(file.clone()); + } + } + } + + let memtables = self.version.memtables.list_memtables(); + + debug!( + "Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}", + self.version.metadata.region_id, + memtables.len(), + files.len(), + total_ssts + ); + + let predicate = Predicate::try_new( + self.request.filters.clone(), + self.version.metadata.schema.clone(), + ) + .context(BuildPredicateSnafu)?; + let mapper = match &self.request.projection { + Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, + None => ProjectionMapper::all(&self.version.metadata)?, + }; + + let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request) + .with_time_range(Some(time_range)) + .with_predicate(Some(predicate)) + .with_memtables(memtables) + .with_files(files); + + Ok(seq_scan) + } + + /// Build time range predicate from filters. + fn build_time_range_predicate(&self) -> TimestampRange { + let time_index = self.version.metadata.time_index_column(); + let unit = time_index + .column_schema + .data_type + .as_timestamp() + .expect("Time index must have timestamp-compatible type") + .unit(); + TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) + .build() + } +} + +/// Returns true if the time range of a SST `file` matches the `predicate`. +fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { + if predicate == &TimestampRange::min_to_max() { + return true; + } + // end timestamp of a SST is inclusive. + let (start, end) = file.time_range(); + let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); + file_ts_range.intersects(predicate) +} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs new file mode 100644 index 000000000000..6ac8e37d3ec0 --- /dev/null +++ b/src/mito2/src/read/seq_scan.rs @@ -0,0 +1,146 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sequential scan. + +use std::sync::Arc; + +use async_stream::try_stream; +use common_error::ext::BoxedError; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_time::range::TimestampRange; +use object_store::ObjectStore; +use snafu::ResultExt; +use store_api::storage::ScanRequest; +use table::predicate::Predicate; + +use crate::error::Result; +use crate::memtable::MemtableRef; +use crate::read::merge::MergeReaderBuilder; +use crate::read::projection::ProjectionMapper; +use crate::read::BatchReader; +use crate::sst::file::FileHandle; +use crate::sst::parquet::reader::ParquetReaderBuilder; + +/// Scans a region and returns rows in a sorted sequence. +/// +/// The output order is always `order by primary key, time index`. +pub struct SeqScan { + /// Directory of SST files. + file_dir: String, + /// Object store that stores SST files. + object_store: ObjectStore, + /// Maps projected Batches to RecordBatches. + mapper: Arc, + /// Original scan request to scan memtable. + // TODO(yingwen): Remove this if memtable::iter() takes another struct. + request: ScanRequest, + + /// Time range filter for time index. + time_range: Option, + /// Predicate to push down. + predicate: Option, + /// Memtables to scan. + memtables: Vec, + /// Handles to SST files to scan. + files: Vec, +} + +impl SeqScan { + /// Creates a new [SeqScan]. + #[must_use] + pub(crate) fn new( + file_dir: String, + object_store: ObjectStore, + mapper: ProjectionMapper, + request: ScanRequest, + ) -> SeqScan { + SeqScan { + file_dir, + object_store, + mapper: Arc::new(mapper), + time_range: None, + predicate: None, + memtables: Vec::new(), + files: Vec::new(), + request, + } + } + + /// Set time range filter for time index. + #[must_use] + pub(crate) fn with_time_range(mut self, time_range: Option) -> Self { + self.time_range = time_range; + self + } + + /// Set predicate to push down. + #[must_use] + pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { + self.predicate = predicate; + self + } + + /// Set memtables to read. + #[must_use] + pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { + self.memtables = memtables; + self + } + + /// Set files to read. + #[must_use] + pub(crate) fn with_files(mut self, files: Vec) -> Self { + self.files = files; + self + } + + /// Builds a stream for the query. + pub async fn build(&self) -> Result { + // Scans all memtables and SSTs. Builds a merge reader to merge results. + let mut builder = MergeReaderBuilder::new(); + for mem in &self.memtables { + let iter = mem.iter(self.request.clone()); + builder.push_batch_iter(iter); + } + for file in &self.files { + let reader = ParquetReaderBuilder::new( + self.file_dir.clone(), + file.clone(), + self.object_store.clone(), + ) + .predicate(self.predicate.clone()) + .time_range(self.time_range) + .projection(Some(self.mapper.column_ids().to_vec())) + .build() + .await?; + builder.push_batch_reader(Box::new(reader)); + } + let mut reader = builder.build().await?; + // Creates a stream to poll the batch reader and convert batch into record batch. + let mapper = self.mapper.clone(); + let stream = try_stream! { + while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? { + yield mapper.convert(&batch)?; + } + }; + let stream = Box::pin(RecordBatchStreamAdaptor::new( + self.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 67ab39accd92..4d240a935d3a 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -26,12 +26,16 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::VersionControlRef; +use crate::region::version::{VersionControlRef, VersionRef}; /// Type to store region version. pub type VersionNumber = u32; /// Metadata and runtime status of a region. +/// +/// Writing and reading a region follow a single-writer-multi-reader rule: +/// - Only the region worker thread this region belongs to can modify the metadata. +/// - Multiple reader threads are allowed to read a specific `version` of a region. #[derive(Debug)] pub(crate) struct MitoRegion { /// Id of this region. @@ -42,6 +46,8 @@ pub(crate) struct MitoRegion { /// Version controller for this region. pub(crate) version_control: VersionControlRef, + /// Data directory of the region. + pub(crate) region_dir: String, /// Manager to maintain manifest for this region. manifest_manager: RegionManifestManager, } @@ -63,6 +69,12 @@ impl MitoRegion { let version_data = self.version_control.current(); version_data.version.metadata.clone() } + + /// Returns current version of the region. + pub(crate) fn version(&self) -> VersionRef { + let version_data = self.version_control.current(); + version_data.version + } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 4bcd55f57389..996bd147e5e2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -92,6 +92,7 @@ impl RegionOpener { Ok(MitoRegion { region_id, version_control, + region_dir: self.region_dir, manifest_manager, }) } @@ -133,6 +134,7 @@ impl RegionOpener { Ok(MitoRegion { region_id: self.region_id, version_control, + region_dir: self.region_dir, manifest_manager, }) } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 09285d370a18..8f0ff5a3b066 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -58,6 +58,13 @@ impl VersionControl { pub(crate) fn current(&self) -> VersionControlData { self.data.read().unwrap().clone() } + + /// Updates committed sequence and entry id. + pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) { + let mut data = self.data.write().unwrap(); + data.committed_sequence = seq; + data.last_entry_id = entry_id; + } } pub(crate) type VersionControlRef = Arc; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 8affb3a0d3d2..e659812547ef 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -122,6 +122,11 @@ impl FileHandle { pub fn file_path(&self, file_dir: &str) -> String { join_path(file_dir, &self.file_id().as_parquet()) } + + /// Returns the time range of the file. + pub fn time_range(&self) -> FileTimeRange { + self.inner.meta.time_range + } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index eb1d5068aa22..c8eb1dbea4c0 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,8 +15,8 @@ //! SST in parquet format. mod format; -mod reader; -mod writer; +pub mod reader; +pub mod writer; use common_base::readable_size::ReadableSize; diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 8b17826edfe8..977604c2607f 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -23,6 +23,8 @@ //! ```text //! field 0, field 1, ..., field N, time index, primary key, sequence, op type //! ``` +//! +//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). use std::collections::HashMap; use std::sync::Arc; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 37f9c0168d9e..06c9533602dd 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -68,22 +68,22 @@ impl ParquetReaderBuilder { } /// Attaches the predicate to the builder. - pub fn predicate(mut self, predicate: Predicate) -> ParquetReaderBuilder { - self.predicate = Some(predicate); + pub fn predicate(mut self, predicate: Option) -> ParquetReaderBuilder { + self.predicate = predicate; self } /// Attaches the time range to the builder. - pub fn time_range(mut self, time_range: TimestampRange) -> ParquetReaderBuilder { - self.time_range = Some(time_range); + pub fn time_range(mut self, time_range: Option) -> ParquetReaderBuilder { + self.time_range = time_range; self } /// Attaches the projection to the builder. /// /// The reader only applies the projection to fields. - pub fn projection(mut self, projection: Vec) -> ParquetReaderBuilder { - self.projection = Some(projection); + pub fn projection(mut self, projection: Option>) -> ParquetReaderBuilder { + self.projection = projection; self } diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 6c85430b089e..f016162d39c6 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -35,6 +35,11 @@ impl SstVersion { levels: new_level_meta_vec(), } } + + /// Returns a slice to metadatas of all levels. + pub(crate) fn levels(&self) -> &[LevelMeta] { + &self.levels + } } // We only has fixed number of level, so we use array to hold elements. This implementation @@ -44,9 +49,9 @@ type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize]; /// Metadata of files in the same SST level. pub struct LevelMeta { /// Level number. - level: Level, + pub level: Level, /// Handles of SSTs in this level. - files: HashMap, + pub files: HashMap, } impl LevelMeta { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 766f25dca52a..af4f04a4cd30 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -231,8 +231,11 @@ impl RegionWriteCtx { } /// Encode and add WAL entry to the writer. - fn add_wal_entry(&self, wal_writer: &mut WalWriter) -> Result<()> { - wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry) + fn add_wal_entry(&mut self, wal_writer: &mut WalWriter) -> Result<()> { + wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry)?; + // We only call this method one time, but we still bump next entry id for consistency. + self.next_entry_id += 1; + Ok(()) } /// Sets error and marks all write operations are failed. @@ -247,7 +250,7 @@ impl RegionWriteCtx { fn write_memtable(&mut self) { debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); - let mutable = self.version.memtables.mutable(); + let mutable = &self.version.memtables.mutable; // Takes mutations from the wal entry. let mutations = mem::take(&mut self.wal_entry.mutations); for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { @@ -259,5 +262,11 @@ impl RegionWriteCtx { notify.err = Some(Arc::new(e)); } } + + // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need + // to decrease `next_sequence` and `next_entry_id` by 1. + self.region + .version_control + .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 7c5b75e869bb..17aadef89685 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -169,6 +169,7 @@ impl RegionMetadata { .map(|index| &self.column_metadatas[index]) } + /// Returns all primary key columns. pub fn primary_key_columns(&self) -> impl Iterator { // safety: RegionMetadata::validate ensures every primary key exists. self.primary_key @@ -183,6 +184,21 @@ impl RegionMetadata { .filter(|column| column.semantic_type == SemanticType::Field) } + /// Returns a column's index in primary key if it is a primary key column. + /// + /// This does a linear search. + pub fn primary_key_index(&self, column_id: ColumnId) -> Option { + self.primary_key.iter().position(|id| *id == column_id) + } + + /// Returns a column's index in fields if it is a field column. + /// + /// This does a linear search. + pub fn field_index(&self, column_id: ColumnId) -> Option { + self.field_columns() + .position(|column| column.column_id == column_id) + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name.