From 05b60f745fcf461029a467767bbf1d4da55d0c6a Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 21 Aug 2023 17:45:17 +0800 Subject: [PATCH 01/19] feat: define stream builder --- src/mito2/src/engine.rs | 12 +++- src/mito2/src/read.rs | 1 + src/mito2/src/read/stream_builder.rs | 99 ++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 src/mito2/src/read/stream_builder.rs diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index dec53bac08b4..16bc4b046f9c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -24,10 +24,11 @@ use object_store::ObjectStore; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::region_request::RegionRequest; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; +use crate::read::stream_builder::RecordBatchStreamBuilder; use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; @@ -107,4 +108,13 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } + + /// Return a record batch stream builder to execute the query. + fn handle_query( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + unimplemented!() + } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 1a6a81b6f804..f4a52f0f6b95 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,6 +15,7 @@ //! Common structs and utilities for reading data. pub mod merge; +pub mod stream_builder; use std::sync::Arc; diff --git a/src/mito2/src/read/stream_builder.rs b/src/mito2/src/read/stream_builder.rs new file mode 100644 index 000000000000..4076fafa16f7 --- /dev/null +++ b/src/mito2/src/read/stream_builder.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides a builder to build a stream to query the engine. + +use common_query::logical_plan::Expr; +use common_recordbatch::{OrderOption, SendableRecordBatchStream}; +use object_store::ObjectStore; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; + +use crate::memtable::MemtableRef; +use crate::sst::file::FileHandle; + +/// Builder to construct a [SendableRecordBatchStream] for a query. +pub struct RecordBatchStreamBuilder { + /// Metadata of the region when the builder created. + metadata: RegionMetadataRef, + /// Projection to push down. + projection: Option>, + /// Filters to push down. + filters: Vec, + /// Required output ordering. + output_ordering: Option>, + /// Memtables to scan. + memtables: Vec, + /// Handles to SST files to scan. + files: Vec, + /// Directory of SST files. + file_dir: String, + /// Object store that stores SST files. + object_store: ObjectStore, +} + +impl RecordBatchStreamBuilder { + /// Creates a new builder. + pub fn new( + metadata: RegionMetadataRef, + file_dir: &str, + object_store: ObjectStore, + ) -> RecordBatchStreamBuilder { + RecordBatchStreamBuilder { + metadata, + projection: None, + filters: Vec::new(), + output_ordering: None, + memtables: Vec::new(), + files: Vec::new(), + file_dir: file_dir.to_string(), + object_store, + } + } + + /// Pushes down projection + pub fn projection(&mut self, projection: Option>) -> &mut Self { + self.projection = projection; + self + } + + /// Pushes down filters. + pub fn filters(&mut self, filters: Vec) -> &mut Self { + self.filters = filters; + self + } + + /// Set required output ordering. + pub fn output_ordering(&mut self, ordering: Option>) -> &mut Self { + self.output_ordering = ordering; + self + } + + /// Set memtables to read. + pub fn memtables(&mut self, memtables: Vec) -> &mut Self { + self.memtables = memtables; + self + } + + /// Set files to read. + pub fn files(&mut self, files: Vec) -> &mut Self { + self.files = files; + self + } + + /// Builds a stream for the query. + pub fn build(&mut self) -> SendableRecordBatchStream { + unimplemented!() + } +} From e193c3ecb2575222da7951f41f247d917c98771c Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 22 Aug 2023 17:31:03 +0800 Subject: [PATCH 02/19] feat: scan region wip --- src/mito2/src/engine.rs | 17 +++-- src/mito2/src/read.rs | 3 +- src/mito2/src/read/scan_region.rs | 46 ++++++++++++++ .../read/{stream_builder.rs => seq_scan.rs} | 63 ++++++++++--------- 4 files changed, 88 insertions(+), 41 deletions(-) create mode 100644 src/mito2/src/read/scan_region.rs rename src/mito2/src/read/{stream_builder.rs => seq_scan.rs} (67%) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 16bc4b046f9c..1bb0b962b04e 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -28,7 +28,6 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; -use crate::read::stream_builder::RecordBatchStreamBuilder; use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; @@ -109,12 +108,12 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } - /// Return a record batch stream builder to execute the query. - fn handle_query( - &self, - region_id: RegionId, - request: ScanRequest, - ) -> Result { - unimplemented!() - } + // /// Return a record batch stream builder to execute the query. + // fn handle_query( + // &self, + // region_id: RegionId, + // request: ScanRequest, + // ) -> Result { + // unimplemented!() + // } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index f4a52f0f6b95..ce9fe75e61eb 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,7 +15,8 @@ //! Common structs and utilities for reading data. pub mod merge; -pub mod stream_builder; +mod scan_region; +pub mod seq_scan; use std::sync::Arc; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs new file mode 100644 index 000000000000..91a893dcb6e4 --- /dev/null +++ b/src/mito2/src/read/scan_region.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Scans a region according to the scan request. + +use common_time::range::TimestampRange; +use store_api::storage::ScanRequest; + +use crate::read::seq_scan::SeqScan; +use crate::region::version::VersionRef; + +/// Helper to scans a region by [ScanRequest]. +pub(crate) struct ScanRegion { + /// Version of the region at scan. + version: VersionRef, + /// Scan request. + request: ScanRequest, +} + +impl ScanRegion { + /// Creates a [ScanRegion]. + pub(crate) fn new(version: VersionRef, request: ScanRequest) -> ScanRegion { + ScanRegion { version, request } + } + + /// Scan sequentailly. + fn seq_scan(&self) -> SeqScan { + unimplemented!() + } + + /// Build time range predicate from filters. + fn build_time_range_predicate(&self) -> TimestampRange { + unimplemented!() + } +} diff --git a/src/mito2/src/read/stream_builder.rs b/src/mito2/src/read/seq_scan.rs similarity index 67% rename from src/mito2/src/read/stream_builder.rs rename to src/mito2/src/read/seq_scan.rs index 4076fafa16f7..75a153062f7f 100644 --- a/src/mito2/src/read/stream_builder.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -12,71 +12,72 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Provides a builder to build a stream to query the engine. +//! Sequential scan. use common_query::logical_plan::Expr; use common_recordbatch::{OrderOption, SendableRecordBatchStream}; +use common_time::range::TimestampRange; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; +use table::predicate::Predicate; use crate::memtable::MemtableRef; use crate::sst::file::FileHandle; -/// Builder to construct a [SendableRecordBatchStream] for a query. -pub struct RecordBatchStreamBuilder { - /// Metadata of the region when the builder created. +/// Scans a region and returns rows in a sorted sequence. +/// +/// The output order is always `order by primary key, time index`. +pub struct SeqScan { + /// Metadata of the region to scan. metadata: RegionMetadataRef, + /// Directory of SST files. + file_dir: String, + /// Object store that stores SST files. + object_store: ObjectStore, + /// Projection to push down. projection: Option>, - /// Filters to push down. - filters: Vec, - /// Required output ordering. - output_ordering: Option>, + /// Time range filter for time index. + time_range: Option, + /// Predicate to push down. + predicate: Option, /// Memtables to scan. memtables: Vec, /// Handles to SST files to scan. files: Vec, - /// Directory of SST files. - file_dir: String, - /// Object store that stores SST files. - object_store: ObjectStore, } -impl RecordBatchStreamBuilder { - /// Creates a new builder. - pub fn new( - metadata: RegionMetadataRef, - file_dir: &str, - object_store: ObjectStore, - ) -> RecordBatchStreamBuilder { - RecordBatchStreamBuilder { +impl SeqScan { + /// Creates a new [SeqScan]. + pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan { + SeqScan { metadata, + file_dir: file_dir.to_string(), + object_store, projection: None, - filters: Vec::new(), - output_ordering: None, + time_range: None, + predicate: None, memtables: Vec::new(), files: Vec::new(), - file_dir: file_dir.to_string(), - object_store, } } - /// Pushes down projection + /// Set projection. pub fn projection(&mut self, projection: Option>) -> &mut Self { self.projection = projection; self } - /// Pushes down filters. - pub fn filters(&mut self, filters: Vec) -> &mut Self { - self.filters = filters; + /// Set time range filter for time index. + pub fn time_range(&mut self, time_range: Option) -> &mut Self { + self.time_range = time_range; self } - /// Set required output ordering. - pub fn output_ordering(&mut self, ordering: Option>) -> &mut Self { - self.output_ordering = ordering; + /// Set predicate to push down. + pub fn predicate(&mut self, predicate: Option) -> &mut Self { + self.predicate = predicate; self } From e900db186e406b57de37e06331b2113812e28e1c Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 22 Aug 2023 21:40:20 +0800 Subject: [PATCH 03/19] feat: create SeqScan in ScanRegion --- src/mito2/src/error.rs | 14 +++- src/mito2/src/memtable/version.rs | 13 ++-- src/mito2/src/read/scan_region.rs | 110 +++++++++++++++++++++++++-- src/mito2/src/read/seq_scan.rs | 15 ++-- src/mito2/src/sst/file.rs | 5 ++ src/mito2/src/sst/version.rs | 9 ++- src/mito2/src/worker/handle_write.rs | 2 +- 7 files changed, 143 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3c22de938dd3..eb5e516a4cb2 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -375,12 +375,19 @@ pub enum Error { #[snafu(display("Invalid flume sender, location: {}", location,))] InvalidFlumeSender { location: Location }, - #[snafu(display("Invalid scheduler state location: {}", location,))] + #[snafu(display("Invalid scheduler state, location: {}", location))] InvalidSchedulerState { location: Location }, #[snafu(display("Failed to stop scheduler, source: {}", source))] - StopScheduler { - source: JoinError, + StopScheduler { source: JoinError }, + + #[snafu(display( + "Failed to build scan predicate, location: {}, source: {}", + location, + source + ))] + BuildPredicate { + source: table::error::Error, location: Location, }, } @@ -444,6 +451,7 @@ impl ErrorExt for Error { InvalidFlumeSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, + BuildPredicate { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index f769da498b53..51a38dd9b978 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -22,9 +22,9 @@ use crate::memtable::MemtableRef; #[derive(Debug)] pub(crate) struct MemtableVersion { /// Mutable memtable. - mutable: MemtableRef, + pub(crate) mutable: MemtableRef, /// Immutable memtables. - immutables: Vec, + pub(crate) immutables: Vec, } pub(crate) type MemtableVersionRef = Arc; @@ -38,8 +38,11 @@ impl MemtableVersion { } } - /// Returns the mutable memtable. - pub(crate) fn mutable(&self) -> &MemtableRef { - &self.mutable + /// Lists mutable and immutable memtables. + pub(crate) fn list_memtables(&self) -> Vec { + let mut memtables = Vec::with_capacity(self.immutables.len() + 1); + memtables.push(self.mutable.clone()); + memtables.extend_from_slice(&self.immutables); + memtables } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 91a893dcb6e4..975b2496f089 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,33 +14,131 @@ //! Scans a region according to the scan request. +use common_telemetry::debug; use common_time::range::TimestampRange; -use store_api::storage::ScanRequest; +use object_store::ObjectStore; +use snafu::ResultExt; +use store_api::metadata::RegionMetadata; +use store_api::storage::{ColumnId, ScanRequest}; +use table::predicate::{Predicate, TimeRangePredicateBuilder}; +use crate::error::{BuildPredicateSnafu, Result}; use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; +use crate::sst::file::FileHandle; /// Helper to scans a region by [ScanRequest]. pub(crate) struct ScanRegion { /// Version of the region at scan. version: VersionRef, + /// Directory of SST files. + file_dir: String, + /// Object store that stores SST files. + object_store: ObjectStore, /// Scan request. request: ScanRequest, } impl ScanRegion { /// Creates a [ScanRegion]. - pub(crate) fn new(version: VersionRef, request: ScanRequest) -> ScanRegion { - ScanRegion { version, request } + pub(crate) fn new( + version: VersionRef, + file_dir: String, + object_store: ObjectStore, + request: ScanRequest, + ) -> ScanRegion { + ScanRegion { + version, + file_dir, + object_store, + request, + } } /// Scan sequentailly. - fn seq_scan(&self) -> SeqScan { - unimplemented!() + pub(crate) fn seq_scan(&self) -> Result { + let time_range = self.build_time_range_predicate(); + + let ssts = &self.version.ssts; + let mut total_ssts = 0; + let mut files = Vec::new(); + for level in ssts.levels() { + total_ssts += level.files.len(); + + for file in level.files.values() { + // Finds SST files in range. + if file_in_range(file, &time_range) { + files.push(file.clone()); + } + } + } + + let memtables = self.version.memtables.list_memtables(); + + debug!( + "Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}", + self.version.metadata.region_id, + memtables.len(), + files.len(), + total_ssts + ); + + let predicate = Predicate::try_new( + self.request.filters.clone(), + self.version.metadata.schema.clone(), + ) + .context(BuildPredicateSnafu)?; + let projection = self + .request + .projection + .as_ref() + .map(|p| projection_indices_to_ids(&self.version.metadata, p)) + .transpose()?; + + let seq_scan = SeqScan::new( + self.version.metadata.clone(), + &self.file_dir, + self.object_store.clone(), + ) + .with_projection(projection) + .with_time_range(Some(time_range)) + .with_predicate(Some(predicate)) + .with_memtables(memtables) + .with_files(files); + + Ok(seq_scan) } /// Build time range predicate from filters. fn build_time_range_predicate(&self) -> TimestampRange { - unimplemented!() + let time_index = self.version.metadata.time_index_column(); + let unit = time_index + .column_schema + .data_type + .as_timestamp() + .expect("Time index must have timestamp-compatible type") + .unit(); + TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) + .build() + } +} + +/// Returns true if the time range of a SST `file` matches the `predicate`. +fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { + if predicate == &TimestampRange::min_to_max() { + return true; } + // end timestamp of a SST is inclusive. + let (start, end) = file.time_range(); + let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); + file_ts_range.intersects(predicate) +} + +// TODO(yingwen): Remove this once scan +/// Map projection indices to column ids. +fn projection_indices_to_ids( + metadata: &RegionMetadata, + projection: &[usize], +) -> Result> { + todo!() } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 75a153062f7f..196ac0855a3c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -14,8 +14,7 @@ //! Sequential scan. -use common_query::logical_plan::Expr; -use common_recordbatch::{OrderOption, SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use common_time::range::TimestampRange; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; @@ -64,37 +63,37 @@ impl SeqScan { } /// Set projection. - pub fn projection(&mut self, projection: Option>) -> &mut Self { + pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; self } /// Set time range filter for time index. - pub fn time_range(&mut self, time_range: Option) -> &mut Self { + pub fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } /// Set predicate to push down. - pub fn predicate(&mut self, predicate: Option) -> &mut Self { + pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } /// Set memtables to read. - pub fn memtables(&mut self, memtables: Vec) -> &mut Self { + pub fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } /// Set files to read. - pub fn files(&mut self, files: Vec) -> &mut Self { + pub fn with_files(mut self, files: Vec) -> Self { self.files = files; self } /// Builds a stream for the query. - pub fn build(&mut self) -> SendableRecordBatchStream { + pub fn build(&self) -> SendableRecordBatchStream { unimplemented!() } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 8affb3a0d3d2..e659812547ef 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -122,6 +122,11 @@ impl FileHandle { pub fn file_path(&self, file_dir: &str) -> String { join_path(file_dir, &self.file_id().as_parquet()) } + + /// Returns the time range of the file. + pub fn time_range(&self) -> FileTimeRange { + self.inner.meta.time_range + } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 6c85430b089e..f016162d39c6 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -35,6 +35,11 @@ impl SstVersion { levels: new_level_meta_vec(), } } + + /// Returns a slice to metadatas of all levels. + pub(crate) fn levels(&self) -> &[LevelMeta] { + &self.levels + } } // We only has fixed number of level, so we use array to hold elements. This implementation @@ -44,9 +49,9 @@ type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize]; /// Metadata of files in the same SST level. pub struct LevelMeta { /// Level number. - level: Level, + pub level: Level, /// Handles of SSTs in this level. - files: HashMap, + pub files: HashMap, } impl LevelMeta { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 5d3becb53fda..b33b43b03a1b 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -239,7 +239,7 @@ impl RegionWriteCtx { fn write_memtable(&mut self) { debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); - let mutable = self.version.memtables.mutable(); + let mutable = &self.version.memtables.mutable; // Takes mutations from the wal entry. let mutations = mem::take(&mut self.wal_entry.mutations); for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { From 5b177c9e5562778eef1f810204c82c651743dc6d Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 23 Aug 2023 10:22:36 +0800 Subject: [PATCH 04/19] feat: scanner --- src/mito2/src/read/scan_region.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 975b2496f089..62da8257f0df 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,6 +14,7 @@ //! Scans a region according to the scan request. +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; @@ -27,6 +28,22 @@ use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +/// A scanner scans a region and returns a [SendableRecordBatchStream]. +pub(crate) enum Scanner { + /// Sequential scan. + Seq(SeqScan), + // TODO(yingwen): Support windowed scan and chained scan. +} + +impl Scanner { + /// Returns a [SendableRecordBatchStream] to retrieve scan results. + pub(crate) fn scan(&self) -> SendableRecordBatchStream { + match self { + Scanner::Seq(seq_scan) => seq_scan.build() + } + } +} + /// Helper to scans a region by [ScanRequest]. pub(crate) struct ScanRegion { /// Version of the region at scan. @@ -55,6 +72,11 @@ impl ScanRegion { } } + /// Returns a [Scanner] to scan the region. + pub(crate) fn scanner(&self) -> Result { + self.seq_scan().map(Scanner::Seq) + } + /// Scan sequentailly. pub(crate) fn seq_scan(&self) -> Result { let time_range = self.build_time_range_predicate(); From 6033b414ea7de6aa5bcd2eb4027acda8dcd64bfd Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 23 Aug 2023 11:18:42 +0800 Subject: [PATCH 05/19] feat: engine handles scan request --- src/mito2/src/engine.rs | 44 ++++++++++++++++++++++--------- src/mito2/src/read.rs | 3 ++- src/mito2/src/read/scan_region.rs | 6 ++--- src/mito2/src/read/seq_scan.rs | 2 ++ src/mito2/src/region.rs | 14 +++++++++- src/mito2/src/region/opener.rs | 2 ++ 6 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 1bb0b962b04e..20ecb414cc24 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,13 +21,15 @@ use std::sync::Arc; use common_query::Output; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; -use crate::error::{RecvSnafu, Result}; +use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; +use crate::read::scan_region::ScanRegion; +use crate::read::Scanner; use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; @@ -73,12 +75,19 @@ impl MitoEngine { pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } + + /// Handles the scan `request` and returns a [Scanner] for the `request`. + fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + self.inner.handle_query(region_id, request) + } } /// Inner struct of [MitoEngine]. struct EngineInner { /// Region workers group. workers: WorkerGroup, + /// Shared object store of all regions. + object_store: ObjectStore, } impl EngineInner { @@ -89,7 +98,8 @@ impl EngineInner { object_store: ObjectStore, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(config, log_store, object_store), + workers: WorkerGroup::start(config, log_store, object_store.clone()), + object_store, } } @@ -99,8 +109,9 @@ impl EngineInner { } // TODO(yingwen): return `Output` instead of `Result<()>`. - /// Handles [RequestBody] and return its executed result. + /// Handles [RegionRequest] and return its executed result. async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> { + // We validate and then convert the `request` into an inner `RequestBody` for ease of handling. let body = RequestBody::try_from_region_request(region_id, request)?; let (request, receiver) = RegionTask::from_request(region_id, body); self.workers.submit_to_worker(request).await?; @@ -108,12 +119,21 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } - // /// Return a record batch stream builder to execute the query. - // fn handle_query( - // &self, - // region_id: RegionId, - // request: ScanRequest, - // ) -> Result { - // unimplemented!() - // } + /// Handles the scan `request` and returns a [Scanner] for the `request`. + fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + // Reading a region doesn't need to go through the region worker thread. + let region = self + .workers + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + let version = region.version(); + let scan_region = ScanRegion::new( + version, + region.region_dir.clone(), + self.object_store.clone(), + request, + ); + + scan_region.scanner() + } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ce9fe75e61eb..d4b2420ad3c7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,7 +15,7 @@ //! Common structs and utilities for reading data. pub mod merge; -mod scan_region; +pub(crate) mod scan_region; pub mod seq_scan; use std::sync::Arc; @@ -39,6 +39,7 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; use crate::memtable::BoxedBatchIterator; +pub use crate::read::scan_region::Scanner; /// Storage internal representation of a batch of rows /// for a primary key (time series). diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 62da8257f0df..2fb91f522a42 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -29,7 +29,7 @@ use crate::region::version::VersionRef; use crate::sst::file::FileHandle; /// A scanner scans a region and returns a [SendableRecordBatchStream]. -pub(crate) enum Scanner { +pub enum Scanner { /// Sequential scan. Seq(SeqScan), // TODO(yingwen): Support windowed scan and chained scan. @@ -37,9 +37,9 @@ pub(crate) enum Scanner { impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results. - pub(crate) fn scan(&self) -> SendableRecordBatchStream { + pub fn scan(&self) -> SendableRecordBatchStream { match self { - Scanner::Seq(seq_scan) => seq_scan.build() + Scanner::Seq(seq_scan) => seq_scan.build(), } } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 196ac0855a3c..b1737f89a661 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -94,6 +94,8 @@ impl SeqScan { /// Builds a stream for the query. pub fn build(&self) -> SendableRecordBatchStream { + // Scans all memtables and SSTs. + // Builds a merge reader to merge results. unimplemented!() } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 67ab39accd92..4d240a935d3a 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -26,12 +26,16 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::VersionControlRef; +use crate::region::version::{VersionControlRef, VersionRef}; /// Type to store region version. pub type VersionNumber = u32; /// Metadata and runtime status of a region. +/// +/// Writing and reading a region follow a single-writer-multi-reader rule: +/// - Only the region worker thread this region belongs to can modify the metadata. +/// - Multiple reader threads are allowed to read a specific `version` of a region. #[derive(Debug)] pub(crate) struct MitoRegion { /// Id of this region. @@ -42,6 +46,8 @@ pub(crate) struct MitoRegion { /// Version controller for this region. pub(crate) version_control: VersionControlRef, + /// Data directory of the region. + pub(crate) region_dir: String, /// Manager to maintain manifest for this region. manifest_manager: RegionManifestManager, } @@ -63,6 +69,12 @@ impl MitoRegion { let version_data = self.version_control.current(); version_data.version.metadata.clone() } + + /// Returns current version of the region. + pub(crate) fn version(&self) -> VersionRef { + let version_data = self.version_control.current(); + version_data.version + } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 4bcd55f57389..996bd147e5e2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -92,6 +92,7 @@ impl RegionOpener { Ok(MitoRegion { region_id, version_control, + region_dir: self.region_dir, manifest_manager, }) } @@ -133,6 +134,7 @@ impl RegionOpener { Ok(MitoRegion { region_id: self.region_id, version_control, + region_dir: self.region_dir, manifest_manager, }) } From 23f45e52060507113c329845d82e6c4ed4ea364b Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 23 Aug 2023 11:39:18 +0800 Subject: [PATCH 06/19] feat: map projection index to column id --- src/mito2/src/read/scan_region.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2fb91f522a42..1bdf30fc14b6 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,16 +14,18 @@ //! Scans a region according to the scan request. +use std::collections::HashMap; + use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, ScanRequest}; use table::predicate::{Predicate, TimeRangePredicateBuilder}; -use crate::error::{BuildPredicateSnafu, Result}; +use crate::error::{BuildPredicateSnafu, InvalidRequestSnafu, Result}; use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -156,11 +158,23 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { file_ts_range.intersects(predicate) } -// TODO(yingwen): Remove this once scan /// Map projection indices to column ids. fn projection_indices_to_ids( metadata: &RegionMetadata, projection: &[usize], ) -> Result> { - todo!() + let mut column_ids = Vec::with_capacity(projection.len()); + // For each projection index, we get the column id. + for idx in projection { + let column_id = metadata + .column_metadatas + .get(*idx) + .context(InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("Index {} out of bound", idx), + })? + .column_id; + column_ids.push(column_id); + } + Ok(column_ids) } From ffaef19a29d2e1d34d70eddd2fc0ae8ae39b456f Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 23 Aug 2023 20:12:35 +0800 Subject: [PATCH 07/19] feat: Impl record batch stream --- src/common/recordbatch/src/error.rs | 2 +- src/mito2/src/read.rs | 1 + src/mito2/src/read/scan_region.rs | 4 +- src/mito2/src/read/seq_scan.rs | 9 ++ src/mito2/src/read/stream.rs | 187 ++++++++++++++++++++++++++++ src/mito2/src/sst/parquet/format.rs | 2 + src/store-api/src/metadata.rs | 15 +++ 7 files changed, 216 insertions(+), 4 deletions(-) create mode 100644 src/mito2/src/read/stream.rs diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 49bfe02bf1d2..b2c88fffee9a 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -37,7 +37,7 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("External error, source: {}", source))] + #[snafu(display("External error, location: {}, source: {}", location, source))] External { location: Location, source: BoxedError, diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index d4b2420ad3c7..1f3948c2d8e5 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -17,6 +17,7 @@ pub mod merge; pub(crate) mod scan_region; pub mod seq_scan; +pub(crate) mod stream; use std::sync::Arc; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1bdf30fc14b6..1d6095e2777f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,9 +14,7 @@ //! Scans a region according to the scan request. -use std::collections::HashMap; - -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{SendableRecordBatchStream}; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index b1737f89a661..c2bea72080d3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -49,6 +49,7 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. + #[must_use] pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan { SeqScan { metadata, @@ -63,39 +64,47 @@ impl SeqScan { } /// Set projection. + #[must_use] pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; self } /// Set time range filter for time index. + #[must_use] pub fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } /// Set predicate to push down. + #[must_use] pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } /// Set memtables to read. + #[must_use] pub fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } /// Set files to read. + #[must_use] pub fn with_files(mut self, files: Vec) -> Self { self.files = files; self } /// Builds a stream for the query. + #[must_use] pub fn build(&self) -> SendableRecordBatchStream { // Scans all memtables and SSTs. // Builds a merge reader to merge results. + + // unimplemented!() } } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs new file mode 100644 index 000000000000..2a33471dbff4 --- /dev/null +++ b/src/mito2/src/read/stream.rs @@ -0,0 +1,187 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Record batch stream. + +use std::sync::Arc; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use api::v1::SemanticType; +use common_error::ext::BoxedError; +use common_recordbatch::RecordBatchStream; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatch}; +use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::schema::{SchemaRef, Schema}; +use datatypes::value::ValueRef; +use datatypes::vectors::VectorRef; +use futures::Stream; +use snafu::ResultExt; +use store_api::metadata::RegionMetadata; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, SortField, RowCodec}; +use crate::error::Result; + +/// Record batch stream implementation. +pub(crate) struct StreamImpl { + /// [Batch] stream. + stream: S, + /// Converts [Batch]es from the `stream` to [RecordBatch]. + converter: BatchConverter, +} + +impl StreamImpl { + /// Returns a new stream from a batch stream. + pub(crate) fn new(stream: S, converter: BatchConverter) -> StreamImpl { + StreamImpl { + stream, + converter, + } + } +} + +impl> + Unpin> Stream for StreamImpl { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(res)) => { + let record_batch = res.map_err(BoxedError::new).context(ExternalSnafu).and_then(|batch| { + self.converter.convert(&batch) + }); + Poll::Ready(Some(record_batch)) + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl> + Unpin> RecordBatchStream for StreamImpl { + fn schema(&self) -> SchemaRef { + self.converter.output_schema.clone() + } +} + +/// Converts a [Batch] to a [RecordBatch]. +pub(crate) struct BatchConverter { + /// Maps column in [RecordBatch] to index in [Batch]. + batch_indices: Vec, + /// Decoder for primary key. + codec: McmpRowCodec, + /// Schema for converted [RecordBatch]. + output_schema: SchemaRef, +} + +impl BatchConverter { + /// Returns a new converter with projection. + /// + /// # Panics + /// Panics if any index in `projection` is out of bound. + pub(crate) fn new(metadata: &RegionMetadata, projection: impl Iterator) -> BatchConverter { + let mut batch_indices = Vec::with_capacity(projection.size_hint().0); + let mut column_schemas = Vec::with_capacity(projection.size_hint().0); + for idx in projection { + // For each projection index, we get the column id for projection. + let column = &metadata.column_metadatas[idx]; + + // Get column index in a batch by its semantic type and column id. + let batch_index = match column.semantic_type { + SemanticType::Tag => { + // Safety: It is a primary key column. + let index = metadata.primary_key_index(column.column_id).unwrap(); + BatchIndex::Tag(index) + }, + SemanticType::Timestamp => BatchIndex::Timestamp, + SemanticType::Field => { + // Safety: It is a field column. + let index = metadata.field_index(column.column_id).unwrap(); + BatchIndex::Field(index) + } + }; + batch_indices.push(batch_index); + + column_schemas.push(metadata.schema.column_schemas()[idx].clone()); + } + + let codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + // Safety: Columns come from existing schema. + let output_schema = Arc::new(Schema::new(column_schemas)); + + BatchConverter { + batch_indices, + codec, + output_schema, + } + } + + /// Returns a new converter without projection. + pub(crate) fn all(metadata: &RegionMetadata) -> BatchConverter { + BatchConverter::new(metadata, 0..metadata.column_metadatas.len()) + } + + /// Converts a [Batch] to a [RecordBatch]. + /// + /// The batch must match the `projection` using to build the converter. + pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { + let pk_values = self.codec.decode(batch.primary_key()).map_err(BoxedError::new).context(ExternalSnafu)?; + + let mut columns = Vec::with_capacity(self.output_schema.num_columns()); + let num_rows = batch.num_rows(); + for (index, column_schema) in self.batch_indices.iter().zip(self.output_schema.column_schemas()) { + match index { + BatchIndex::Tag(idx) => { + let value = pk_values[*idx].as_value_ref(); + let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?; + columns.push(vector); + }, + BatchIndex::Timestamp => { + columns.push(batch.timestamps().clone()); + }, + BatchIndex::Field(idx) => { + columns.push(batch.fields()[*idx].data.clone()); + }, + } + } + + RecordBatch::new(self.output_schema.clone(), columns) + } +} + +/// Index of a vector in a [Batch]. +#[derive(Debug, Clone, Copy)] +enum BatchIndex { + /// Index in primary keys. + Tag(usize), + /// The time index column. + Timestamp, + /// Index in fields. + Field(usize), +} + +/// Returns a vector with repeated values. +fn new_repeated_vector(data_type: &ConcreteDataType, value: ValueRef, num_rows: usize) -> common_recordbatch::error::Result { + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector.try_push_value_ref(value).map_err(BoxedError::new).context(ExternalSnafu)?; + // This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type. + let base_vector = mutable_vector.to_vector(); + Ok(base_vector.replicate(&[num_rows])) +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 8b17826edfe8..977604c2607f 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -23,6 +23,8 @@ //! ```text //! field 0, field 1, ..., field N, time index, primary key, sequence, op type //! ``` +//! +//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). use std::collections::HashMap; use std::sync::Arc; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 7c5b75e869bb..7d298557bd85 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -169,6 +169,7 @@ impl RegionMetadata { .map(|index| &self.column_metadatas[index]) } + /// Returns all primary key columns. pub fn primary_key_columns(&self) -> impl Iterator { // safety: RegionMetadata::validate ensures every primary key exists. self.primary_key @@ -183,6 +184,20 @@ impl RegionMetadata { .filter(|column| column.semantic_type == SemanticType::Field) } + /// Returns a column's index in primary key if it is a primary key column. + /// + /// This does a linear search. + pub fn primary_key_index(&self, column_id: ColumnId) -> Option { + self.primary_key.iter().position(|id| *id == column_id) + } + + /// Returns a column's index in fields if it is a field column. + /// + /// This does a linear search. + pub fn field_index(&self, column_id: ColumnId) -> Option { + self.field_columns().position(|column| column.column_id == column_id) + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name. From a270e9559bcf7b900e0472072f22398c3ba28470 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 23 Aug 2023 21:11:23 +0800 Subject: [PATCH 08/19] refactor: change BatchConverter to ProjectionMapper --- src/mito2/src/read/scan_region.rs | 43 +++--------- src/mito2/src/read/seq_scan.rs | 30 ++++----- src/mito2/src/read/stream.rs | 105 ++++++++++++++++++------------ src/store-api/src/metadata.rs | 3 +- 4 files changed, 88 insertions(+), 93 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1d6095e2777f..c35db88e79ef 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,17 +14,17 @@ //! Scans a region according to the scan request. -use common_recordbatch::{SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadata; -use store_api::storage::{ColumnId, ScanRequest}; +use snafu::ResultExt; +use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; -use crate::error::{BuildPredicateSnafu, InvalidRequestSnafu, Result}; +use crate::error::{BuildPredicateSnafu, Result}; use crate::read::seq_scan::SeqScan; +use crate::read::stream::ProjectionMapper; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -110,19 +110,17 @@ impl ScanRegion { self.version.metadata.schema.clone(), ) .context(BuildPredicateSnafu)?; - let projection = self - .request - .projection - .as_ref() - .map(|p| projection_indices_to_ids(&self.version.metadata, p)) - .transpose()?; + let mapper = match &self.request.projection { + Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, + None => ProjectionMapper::all(&self.version.metadata)?, + }; let seq_scan = SeqScan::new( self.version.metadata.clone(), &self.file_dir, self.object_store.clone(), + mapper, ) - .with_projection(projection) .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) @@ -155,24 +153,3 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); file_ts_range.intersects(predicate) } - -/// Map projection indices to column ids. -fn projection_indices_to_ids( - metadata: &RegionMetadata, - projection: &[usize], -) -> Result> { - let mut column_ids = Vec::with_capacity(projection.len()); - // For each projection index, we get the column id. - for idx in projection { - let column_id = metadata - .column_metadatas - .get(*idx) - .context(InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!("Index {} out of bound", idx), - })? - .column_id; - column_ids.push(column_id); - } - Ok(column_ids) -} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index c2bea72080d3..0f40af2fbefb 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -18,10 +18,10 @@ use common_recordbatch::SendableRecordBatchStream; use common_time::range::TimestampRange; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::memtable::MemtableRef; +use crate::read::stream::ProjectionMapper; use crate::sst::file::FileHandle; /// Scans a region and returns rows in a sorted sequence. @@ -34,9 +34,9 @@ pub struct SeqScan { file_dir: String, /// Object store that stores SST files. object_store: ObjectStore, + /// Maps projected Batches to RecordBatches. + mapper: ProjectionMapper, - /// Projection to push down. - projection: Option>, /// Time range filter for time index. time_range: Option, /// Predicate to push down. @@ -50,12 +50,17 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. #[must_use] - pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan { + pub(crate) fn new( + metadata: RegionMetadataRef, + file_dir: &str, + object_store: ObjectStore, + mapper: ProjectionMapper, + ) -> SeqScan { SeqScan { metadata, file_dir: file_dir.to_string(), object_store, - projection: None, + mapper, time_range: None, predicate: None, memtables: Vec::new(), @@ -63,37 +68,30 @@ impl SeqScan { } } - /// Set projection. - #[must_use] - pub fn with_projection(mut self, projection: Option>) -> Self { - self.projection = projection; - self - } - /// Set time range filter for time index. #[must_use] - pub fn with_time_range(mut self, time_range: Option) -> Self { + pub(crate) fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } /// Set predicate to push down. #[must_use] - pub fn with_predicate(mut self, predicate: Option) -> Self { + pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } /// Set memtables to read. #[must_use] - pub fn with_memtables(mut self, memtables: Vec) -> Self { + pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } /// Set files to read. #[must_use] - pub fn with_files(mut self, files: Vec) -> Self { + pub(crate) fn with_files(mut self, files: Vec) -> Self { self.files = files; self } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 2a33471dbff4..26eb0a4d9b3f 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -14,42 +14,38 @@ //! Record batch stream. -use std::sync::Arc; use std::pin::Pin; -use std::task::Context; -use std::task::Poll; +use std::sync::Arc; +use std::task::{Context, Poll}; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_recordbatch::RecordBatchStream; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use datatypes::prelude::{ConcreteDataType, DataType}; -use datatypes::schema::{SchemaRef, Schema}; +use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use futures::Stream; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; + +use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; -use crate::row_converter::{McmpRowCodec, SortField, RowCodec}; -use crate::error::Result; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Record batch stream implementation. pub(crate) struct StreamImpl { /// [Batch] stream. stream: S, /// Converts [Batch]es from the `stream` to [RecordBatch]. - converter: BatchConverter, + mapper: ProjectionMapper, } impl StreamImpl { /// Returns a new stream from a batch stream. - pub(crate) fn new(stream: S, converter: BatchConverter) -> StreamImpl { - StreamImpl { - stream, - converter, - } + pub(crate) fn new(stream: S, mapper: ProjectionMapper) -> StreamImpl { + StreamImpl { stream, mapper } } } @@ -59,11 +55,12 @@ impl> + Unpin> Stream for StreamImpl { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(Some(res)) => { - let record_batch = res.map_err(BoxedError::new).context(ExternalSnafu).and_then(|batch| { - self.converter.convert(&batch) - }); + let record_batch = res + .map_err(BoxedError::new) + .context(ExternalSnafu) + .and_then(|batch| self.mapper.convert(&batch)); Poll::Ready(Some(record_batch)) - }, + } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } @@ -72,12 +69,12 @@ impl> + Unpin> Stream for StreamImpl { impl> + Unpin> RecordBatchStream for StreamImpl { fn schema(&self) -> SchemaRef { - self.converter.output_schema.clone() + self.mapper.output_schema.clone() } } -/// Converts a [Batch] to a [RecordBatch]. -pub(crate) struct BatchConverter { +/// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. +pub(crate) struct ProjectionMapper { /// Maps column in [RecordBatch] to index in [Batch]. batch_indices: Vec, /// Decoder for primary key. @@ -86,17 +83,23 @@ pub(crate) struct BatchConverter { output_schema: SchemaRef, } -impl BatchConverter { - /// Returns a new converter with projection. - /// - /// # Panics - /// Panics if any index in `projection` is out of bound. - pub(crate) fn new(metadata: &RegionMetadata, projection: impl Iterator) -> BatchConverter { +impl ProjectionMapper { + /// Returns a new mapper with projection. + pub(crate) fn new( + metadata: &RegionMetadata, + projection: impl Iterator, + ) -> Result { let mut batch_indices = Vec::with_capacity(projection.size_hint().0); let mut column_schemas = Vec::with_capacity(projection.size_hint().0); for idx in projection { // For each projection index, we get the column id for projection. - let column = &metadata.column_metadatas[idx]; + let column = metadata + .column_metadatas + .get(idx) + .context(InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("projection index {} is out of bound", idx), + })?; // Get column index in a batch by its semantic type and column id. let batch_index = match column.semantic_type { @@ -104,7 +107,7 @@ impl BatchConverter { // Safety: It is a primary key column. let index = metadata.primary_key_index(column.column_id).unwrap(); BatchIndex::Tag(index) - }, + } SemanticType::Timestamp => BatchIndex::Timestamp, SemanticType::Field => { // Safety: It is a field column. @@ -114,6 +117,7 @@ impl BatchConverter { }; batch_indices.push(batch_index); + // Safety: idx is valid. column_schemas.push(metadata.schema.column_schemas()[idx].clone()); } @@ -126,39 +130,47 @@ impl BatchConverter { // Safety: Columns come from existing schema. let output_schema = Arc::new(Schema::new(column_schemas)); - BatchConverter { + Ok(ProjectionMapper { batch_indices, codec, output_schema, - } + }) } - /// Returns a new converter without projection. - pub(crate) fn all(metadata: &RegionMetadata) -> BatchConverter { - BatchConverter::new(metadata, 0..metadata.column_metadatas.len()) + /// Returns a new mapper without projection. + pub(crate) fn all(metadata: &RegionMetadata) -> Result { + ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) } /// Converts a [Batch] to a [RecordBatch]. /// - /// The batch must match the `projection` using to build the converter. + /// The batch must match the `projection` using to build the mapper. pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { - let pk_values = self.codec.decode(batch.primary_key()).map_err(BoxedError::new).context(ExternalSnafu)?; + let pk_values = self + .codec + .decode(batch.primary_key()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; let mut columns = Vec::with_capacity(self.output_schema.num_columns()); let num_rows = batch.num_rows(); - for (index, column_schema) in self.batch_indices.iter().zip(self.output_schema.column_schemas()) { + for (index, column_schema) in self + .batch_indices + .iter() + .zip(self.output_schema.column_schemas()) + { match index { BatchIndex::Tag(idx) => { let value = pk_values[*idx].as_value_ref(); let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?; columns.push(vector); - }, + } BatchIndex::Timestamp => { columns.push(batch.timestamps().clone()); - }, + } BatchIndex::Field(idx) => { columns.push(batch.fields()[*idx].data.clone()); - }, + } } } @@ -178,9 +190,16 @@ enum BatchIndex { } /// Returns a vector with repeated values. -fn new_repeated_vector(data_type: &ConcreteDataType, value: ValueRef, num_rows: usize) -> common_recordbatch::error::Result { +fn new_repeated_vector( + data_type: &ConcreteDataType, + value: ValueRef, + num_rows: usize, +) -> common_recordbatch::error::Result { let mut mutable_vector = data_type.create_mutable_vector(1); - mutable_vector.try_push_value_ref(value).map_err(BoxedError::new).context(ExternalSnafu)?; + mutable_vector + .try_push_value_ref(value) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; // This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 7d298557bd85..17aadef89685 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -195,7 +195,8 @@ impl RegionMetadata { /// /// This does a linear search. pub fn field_index(&self, column_id: ColumnId) -> Option { - self.field_columns().position(|column| column.column_id == column_id) + self.field_columns() + .position(|column| column.column_id == column_id) } /// Checks whether the metadata is valid. From 6354cfd29ab219063671c6952e20060cd1735a7a Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 11:57:22 +0800 Subject: [PATCH 09/19] feat: add column_ids to mapper --- src/mito2/src/read/seq_scan.rs | 4 +--- src/mito2/src/read/stream.rs | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 0f40af2fbefb..0f68eda15aad 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -99,9 +99,7 @@ impl SeqScan { /// Builds a stream for the query. #[must_use] pub fn build(&self) -> SendableRecordBatchStream { - // Scans all memtables and SSTs. - // Builds a merge reader to merge results. - + // Scans all memtables and SSTs. Builds a merge reader to merge results. // unimplemented!() } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 26eb0a4d9b3f..6cc9b6175a52 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -29,6 +29,7 @@ use datatypes::vectors::VectorRef; use futures::Stream; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; +use store_api::storage::ColumnId; use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; @@ -81,6 +82,8 @@ pub(crate) struct ProjectionMapper { codec: McmpRowCodec, /// Schema for converted [RecordBatch]. output_schema: SchemaRef, + /// Id of columns to project. + column_ids: Vec, } impl ProjectionMapper { @@ -89,8 +92,10 @@ impl ProjectionMapper { metadata: &RegionMetadata, projection: impl Iterator, ) -> Result { - let mut batch_indices = Vec::with_capacity(projection.size_hint().0); - let mut column_schemas = Vec::with_capacity(projection.size_hint().0); + let projection_len = projection.size_hint().0; + let mut batch_indices = Vec::with_capacity(projection_len); + let mut column_schemas = Vec::with_capacity(projection_len); + let mut column_ids = Vec::with_capacity(projection_len); for idx in projection { // For each projection index, we get the column id for projection. let column = metadata @@ -116,7 +121,7 @@ impl ProjectionMapper { } }; batch_indices.push(batch_index); - + column_ids.push(column.column_id); // Safety: idx is valid. column_schemas.push(metadata.schema.column_schemas()[idx].clone()); } @@ -134,6 +139,7 @@ impl ProjectionMapper { batch_indices, codec, output_schema, + column_ids, }) } @@ -142,6 +148,11 @@ impl ProjectionMapper { ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) } + /// Returns ids of projected columns. + pub(crate) fn column_ids(&self) -> &[ColumnId] { + &self.column_ids + } + /// Converts a [Batch] to a [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. From 57f0947ebdda2d3c901a670639b9696139fea821 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 15:14:28 +0800 Subject: [PATCH 10/19] feat: implement SeqScan::build() --- src/mito2/src/read/scan_region.rs | 23 ++++------ src/mito2/src/read/seq_scan.rs | 67 +++++++++++++++++++++++------ src/mito2/src/read/stream.rs | 39 ++++++++--------- src/mito2/src/sst/parquet.rs | 4 +- src/mito2/src/sst/parquet/reader.rs | 12 +++--- 5 files changed, 90 insertions(+), 55 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c35db88e79ef..03f3a2241a78 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -37,9 +37,9 @@ pub enum Scanner { impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results. - pub fn scan(&self) -> SendableRecordBatchStream { + pub async fn scan(&self) -> Result { match self { - Scanner::Seq(seq_scan) => seq_scan.build(), + Scanner::Seq(seq_scan) => seq_scan.build().await, } } } @@ -73,12 +73,12 @@ impl ScanRegion { } /// Returns a [Scanner] to scan the region. - pub(crate) fn scanner(&self) -> Result { + pub(crate) fn scanner(self) -> Result { self.seq_scan().map(Scanner::Seq) } /// Scan sequentailly. - pub(crate) fn seq_scan(&self) -> Result { + pub(crate) fn seq_scan(self) -> Result { let time_range = self.build_time_range_predicate(); let ssts = &self.version.ssts; @@ -115,16 +115,11 @@ impl ScanRegion { None => ProjectionMapper::all(&self.version.metadata)?, }; - let seq_scan = SeqScan::new( - self.version.metadata.clone(), - &self.file_dir, - self.object_store.clone(), - mapper, - ) - .with_time_range(Some(time_range)) - .with_predicate(Some(predicate)) - .with_memtables(memtables) - .with_files(files); + let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request) + .with_time_range(Some(time_range)) + .with_predicate(Some(predicate)) + .with_memtables(memtables) + .with_files(files); Ok(seq_scan) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 0f68eda15aad..654d9b597877 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -14,28 +14,39 @@ //! Sequential scan. +use std::sync::Arc; + +use async_stream::try_stream; +use common_error::ext::BoxedError; +use common_recordbatch::error::ExternalSnafu; use common_recordbatch::SendableRecordBatchStream; use common_time::range::TimestampRange; use object_store::ObjectStore; -use store_api::metadata::RegionMetadataRef; +use snafu::ResultExt; +use store_api::storage::ScanRequest; use table::predicate::Predicate; +use crate::error::Result; use crate::memtable::MemtableRef; -use crate::read::stream::ProjectionMapper; +use crate::read::merge::MergeReaderBuilder; +use crate::read::stream::{ProjectionMapper, StreamImpl}; +use crate::read::BatchReader; use crate::sst::file::FileHandle; +use crate::sst::parquet::reader::ParquetReaderBuilder; /// Scans a region and returns rows in a sorted sequence. /// /// The output order is always `order by primary key, time index`. pub struct SeqScan { - /// Metadata of the region to scan. - metadata: RegionMetadataRef, /// Directory of SST files. file_dir: String, /// Object store that stores SST files. object_store: ObjectStore, /// Maps projected Batches to RecordBatches. - mapper: ProjectionMapper, + mapper: Arc, + /// Original scan request to scan memtable. + // TODO(yingwen): Remove this if memtable::iter() takes another struct. + request: ScanRequest, /// Time range filter for time index. time_range: Option, @@ -51,20 +62,20 @@ impl SeqScan { /// Creates a new [SeqScan]. #[must_use] pub(crate) fn new( - metadata: RegionMetadataRef, - file_dir: &str, + file_dir: String, object_store: ObjectStore, mapper: ProjectionMapper, + request: ScanRequest, ) -> SeqScan { SeqScan { - metadata, - file_dir: file_dir.to_string(), + file_dir, object_store, - mapper, + mapper: Arc::new(mapper), time_range: None, predicate: None, memtables: Vec::new(), files: Vec::new(), + request, } } @@ -98,9 +109,39 @@ impl SeqScan { /// Builds a stream for the query. #[must_use] - pub fn build(&self) -> SendableRecordBatchStream { + pub async fn build(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. - // - unimplemented!() + let mut builder = MergeReaderBuilder::new(); + for mem in &self.memtables { + let iter = mem.iter(self.request.clone()); + builder.push_batch_iter(iter); + } + for file in &self.files { + let reader = ParquetReaderBuilder::new( + self.file_dir.clone(), + file.clone(), + self.object_store.clone(), + ) + .predicate(self.predicate.clone()) + .time_range(self.time_range.clone()) + .projection(Some(self.mapper.column_ids().to_vec())) + .build() + .await?; + builder.push_batch_reader(Box::new(reader)); + } + let mut reader = builder.build().await?; + // Creates a stream to poll the batch reader and convert batch into record batch. + let mapper = self.mapper.clone(); + let stream = try_stream! { + while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? { + yield mapper.convert(&batch)?; + } + }; + let stream = Box::pin(StreamImpl::new( + Box::pin(stream), + self.mapper.output_schema(), + )); + + Ok(stream) } } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 6cc9b6175a52..2ba6ed53f3d3 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -37,40 +37,34 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Record batch stream implementation. pub(crate) struct StreamImpl { - /// [Batch] stream. + /// [RecordBatch] stream. stream: S, - /// Converts [Batch]es from the `stream` to [RecordBatch]. - mapper: ProjectionMapper, + /// Schema of returned record batches. + schema: SchemaRef, } impl StreamImpl { - /// Returns a new stream from a batch stream. - pub(crate) fn new(stream: S, mapper: ProjectionMapper) -> StreamImpl { - StreamImpl { stream, mapper } + /// Returns a new stream from a record batch stream and its schema. + pub(crate) fn new(stream: S, schema: SchemaRef) -> StreamImpl { + StreamImpl { stream, schema } } } -impl> + Unpin> Stream for StreamImpl { +impl> + Unpin> Stream + for StreamImpl +{ type Item = common_recordbatch::error::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Ready(Some(res)) => { - let record_batch = res - .map_err(BoxedError::new) - .context(ExternalSnafu) - .and_then(|batch| self.mapper.convert(&batch)); - Poll::Ready(Some(record_batch)) - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } + Pin::new(&mut self.stream).poll_next(cx) } } -impl> + Unpin> RecordBatchStream for StreamImpl { +impl> + Unpin> RecordBatchStream + for StreamImpl +{ fn schema(&self) -> SchemaRef { - self.mapper.output_schema.clone() + self.schema.clone() } } @@ -153,6 +147,11 @@ impl ProjectionMapper { &self.column_ids } + /// Returns the schema of converted [RecordBatch]. + pub(crate) fn output_schema(&self) -> SchemaRef { + self.output_schema.clone() + } + /// Converts a [Batch] to a [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index eb1d5068aa22..c8eb1dbea4c0 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,8 +15,8 @@ //! SST in parquet format. mod format; -mod reader; -mod writer; +pub mod reader; +pub mod writer; use common_base::readable_size::ReadableSize; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 37f9c0168d9e..06c9533602dd 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -68,22 +68,22 @@ impl ParquetReaderBuilder { } /// Attaches the predicate to the builder. - pub fn predicate(mut self, predicate: Predicate) -> ParquetReaderBuilder { - self.predicate = Some(predicate); + pub fn predicate(mut self, predicate: Option) -> ParquetReaderBuilder { + self.predicate = predicate; self } /// Attaches the time range to the builder. - pub fn time_range(mut self, time_range: TimestampRange) -> ParquetReaderBuilder { - self.time_range = Some(time_range); + pub fn time_range(mut self, time_range: Option) -> ParquetReaderBuilder { + self.time_range = time_range; self } /// Attaches the projection to the builder. /// /// The reader only applies the projection to fields. - pub fn projection(mut self, projection: Vec) -> ParquetReaderBuilder { - self.projection = Some(projection); + pub fn projection(mut self, projection: Option>) -> ParquetReaderBuilder { + self.projection = projection; self } From d5c192b161bf11581f90b3b52ed9dc2062832c92 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 15:29:00 +0800 Subject: [PATCH 11/19] chore: fix typo --- src/mito2/src/read/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 2ba6ed53f3d3..310311d3c916 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -210,7 +210,7 @@ fn new_repeated_vector( .try_push_value_ref(value) .map_err(BoxedError::new) .context(ExternalSnafu)?; - // This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type. + // This requires an additional allocation. TODO(yingwen): Add a way to create repeated vector to data type. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } From 1d9cfd53c60fd659694f1d7552d8b5c564ad28bc Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 16:02:21 +0800 Subject: [PATCH 12/19] docs: add mermaid for ScanRegion --- src/mito2/src/read/scan_region.rs | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 03f3a2241a78..28a3c7359714 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -44,7 +44,44 @@ impl Scanner { } } +#[cfg_attr(doc, aquamarine::aquamarine)] /// Helper to scans a region by [ScanRequest]. +/// +/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It +/// creates a [Scanner] to actually scan these targets in [Scanner::scan()]. +/// +/// ```mermaid +/// classDiagram +/// class ScanRegion { +/// -VersionRef version +/// -ScanRequest request +/// ~scanner() Scanner +/// ~seq_scan() SeqScan +/// } +/// class Scanner { +/// <> +/// SeqScan +/// +scan() SendableRecordBatchStream +/// } +/// class SeqScan { +/// -ProjectionMapper mapper +/// -Option~TimeRange~ time_range +/// -Option~Predicate~ predicate +/// -Vec~MemtableRef~ memtables +/// -Vec~FileHandle~ files +/// +build() SendableRecordBatchStream +/// } +/// class ProjectionMapper { +/// ~output_schema() SchemaRef +/// ~convert(Batch) RecordBatch +/// } +/// ScanRegion -- Scanner +/// ScanRegion o-- ScanRequest +/// Scanner o-- SeqScan +/// Scanner -- SendableRecordBatchStream +/// SeqScan o-- ProjectionMapper +/// SeqScan -- SendableRecordBatchStream +/// ``` pub(crate) struct ScanRegion { /// Version of the region at scan. version: VersionRef, From fb83e0814ca1d9ffc3e3aa3059b43dc6426ea254 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 16:11:45 +0800 Subject: [PATCH 13/19] style: fix clippy --- src/mito2/src/read/seq_scan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 654d9b597877..2da30b288384 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -108,7 +108,6 @@ impl SeqScan { } /// Builds a stream for the query. - #[must_use] pub async fn build(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut builder = MergeReaderBuilder::new(); @@ -123,7 +122,7 @@ impl SeqScan { self.object_store.clone(), ) .predicate(self.predicate.clone()) - .time_range(self.time_range.clone()) + .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .build() .await?; From b6aa5e4a8393cc19f2face7a791b419c990ea6fe Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Aug 2023 16:36:23 +0800 Subject: [PATCH 14/19] test: fix record batch test --- src/common/recordbatch/src/adapter.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 6c113562cddf..b71bad410317 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -286,6 +286,7 @@ mod test { use snafu::IntoError; use super::*; + use crate::error::Error; use crate::RecordBatches; #[tokio::test] @@ -354,20 +355,24 @@ mod test { .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))), ])); let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream); - let result = RecordBatches::try_collect(Box::pin(adapter)).await; - assert_eq!( - result.unwrap_err().to_string(), - "External error, source: Unknown", + let err = RecordBatches::try_collect(Box::pin(adapter)) + .await + .unwrap_err(); + assert!( + matches!(err, Error::External { .. }), + "unexpected err {err}" ); let failed_to_init_stream = new_future_stream(Err(error::ExternalSnafu .into_error(BoxedError::new(MockError::new(StatusCode::Internal))))); let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream); - let result = RecordBatches::try_collect(Box::pin(adapter)).await; - assert_eq!( - result.unwrap_err().to_string(), - "External error, source: Internal", + let err = RecordBatches::try_collect(Box::pin(adapter)) + .await + .unwrap_err(); + assert!( + matches!(err, Error::External { .. }), + "unexpected err {err}" ); } } From 2920427f242dc17c8a01f994d7c6f7d468c1281b Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 25 Aug 2023 16:29:07 +0800 Subject: [PATCH 15/19] fix: update sequence and entry id --- src/mito2/src/region/version.rs | 7 +++++++ src/mito2/src/worker/handle_write.rs | 13 +++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 09285d370a18..8f0ff5a3b066 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -58,6 +58,13 @@ impl VersionControl { pub(crate) fn current(&self) -> VersionControlData { self.data.read().unwrap().clone() } + + /// Updates committed sequence and entry id. + pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) { + let mut data = self.data.write().unwrap(); + data.committed_sequence = seq; + data.last_entry_id = entry_id; + } } pub(crate) type VersionControlRef = Arc; diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 1054adbc21b3..af4f04a4cd30 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -231,8 +231,11 @@ impl RegionWriteCtx { } /// Encode and add WAL entry to the writer. - fn add_wal_entry(&self, wal_writer: &mut WalWriter) -> Result<()> { - wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry) + fn add_wal_entry(&mut self, wal_writer: &mut WalWriter) -> Result<()> { + wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry)?; + // We only call this method one time, but we still bump next entry id for consistency. + self.next_entry_id += 1; + Ok(()) } /// Sets error and marks all write operations are failed. @@ -259,5 +262,11 @@ impl RegionWriteCtx { notify.err = Some(Arc::new(e)); } } + + // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need + // to decrease `next_sequence` and `next_entry_id` by 1. + self.region + .version_control + .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } } From d14fa4372fcf343d4d8ea3dda065437ea1ee66d4 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 25 Aug 2023 16:53:22 +0800 Subject: [PATCH 16/19] test: test query --- src/mito2/src/engine/tests.rs | 47 ++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index ff3fc590c95c..2beb97874018 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Row, Rows}; +use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; @@ -246,7 +247,7 @@ fn build_rows(num_rows: usize) -> Vec { value_data: Some(ValueData::F64Value(i as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64)), + value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), }, ], }) @@ -285,3 +286,47 @@ async fn test_write_to_region() { }; assert_eq!(num_rows, rows_inserted); } + +// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests +// for series with multiple points and other cases. +#[tokio::test] +async fn test_write_query_region() { + let env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(3), + }; + engine + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .await + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.handle_query(region_id, request).unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} From a21ba246b3ed9ecceec444876919d64b45197db5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 25 Aug 2023 16:55:42 +0800 Subject: [PATCH 17/19] feat: address CR comment --- src/mito2/src/engine.rs | 3 +-- src/mito2/src/error.rs | 7 +++++-- src/mito2/src/lib.rs | 1 + src/mito2/src/memtable/version.rs | 2 +- src/mito2/src/read.rs | 3 +-- src/mito2/src/read/scan_region.rs | 4 ++-- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 8fa2f1eea436..1b9c938b72ce 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -28,8 +28,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; -use crate::read::scan_region::ScanRegion; -use crate::read::Scanner; +use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::{RegionTask, RequestBody}; use crate::worker::WorkerGroup; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 0ba01bae9ccb..1361aa170876 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -372,8 +372,11 @@ pub enum Error { #[snafu(display("Invalid scheduler state, location: {}", location))] InvalidSchedulerState { location: Location }, - #[snafu(display("Failed to stop scheduler, source: {}", source))] - StopScheduler { source: JoinError }, + #[snafu(display("Failed to stop scheduler, location: {}, source: {}", location, source))] + StopScheduler { + source: JoinError, + location: Location, + }, #[snafu(display( "Failed to build scan predicate, location: {}, source: {}", diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index ab058c05b267..fcb6556ebc87 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -29,6 +29,7 @@ pub mod error; pub mod manifest; #[allow(dead_code)] pub mod memtable; +#[allow(dead_code)] pub mod read; #[allow(dead_code)] mod region; diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 51a38dd9b978..53df94d19195 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -24,7 +24,7 @@ pub(crate) struct MemtableVersion { /// Mutable memtable. pub(crate) mutable: MemtableRef, /// Immutable memtables. - pub(crate) immutables: Vec, + immutables: Vec, } pub(crate) type MemtableVersionRef = Arc; diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 1f3948c2d8e5..67e26f34a8b2 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,7 +16,7 @@ pub mod merge; pub(crate) mod scan_region; -pub mod seq_scan; +pub(crate) mod seq_scan; pub(crate) mod stream; use std::sync::Arc; @@ -40,7 +40,6 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; use crate::memtable::BoxedBatchIterator; -pub use crate::read::scan_region::Scanner; /// Storage internal representation of a batch of rows /// for a primary key (time series). diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 28a3c7359714..5d277913f102 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -29,7 +29,7 @@ use crate::region::version::VersionRef; use crate::sst::file::FileHandle; /// A scanner scans a region and returns a [SendableRecordBatchStream]. -pub enum Scanner { +pub(crate) enum Scanner { /// Sequential scan. Seq(SeqScan), // TODO(yingwen): Support windowed scan and chained scan. @@ -37,7 +37,7 @@ pub enum Scanner { impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results. - pub async fn scan(&self) -> Result { + pub(crate) async fn scan(&self) -> Result { match self { Scanner::Seq(seq_scan) => seq_scan.build().await, } From f0a2a50fa8fd3c019595188324bbdc549cd293e0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 25 Aug 2023 17:49:31 +0800 Subject: [PATCH 18/19] chore: address CR comments --- src/common/recordbatch/src/lib.rs | 21 +++++++-- src/mito2/src/read.rs | 2 +- .../src/read/{stream.rs => projection.rs} | 43 ++----------------- src/mito2/src/read/scan_region.rs | 2 +- src/mito2/src/read/seq_scan.rs | 8 ++-- 5 files changed, 27 insertions(+), 49 deletions(-) rename src/mito2/src/read/{stream.rs => projection.rs} (84%) diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 60415d95e1b2..9730b4041f73 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -202,13 +202,26 @@ impl Stream for SimpleRecordBatchStream { } /// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream]. -pub struct RecordBatchStreamAdaptor { +pub struct RecordBatchStreamAdaptor { pub schema: SchemaRef, - pub stream: Pin> + Send>>, + pub stream: S, pub output_ordering: Option>, } -impl RecordBatchStream for RecordBatchStreamAdaptor { +impl RecordBatchStreamAdaptor { + /// Creates a RecordBatchStreamAdaptor without output ordering requirement. + pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor { + RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + } + } +} + +impl> + Unpin> RecordBatchStream + for RecordBatchStreamAdaptor +{ fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -218,7 +231,7 @@ impl RecordBatchStream for RecordBatchStreamAdaptor { } } -impl Stream for RecordBatchStreamAdaptor { +impl> + Unpin> Stream for RecordBatchStreamAdaptor { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 67e26f34a8b2..fb1629f61f42 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,9 +15,9 @@ //! Common structs and utilities for reading data. pub mod merge; +pub(crate) mod projection; pub(crate) mod scan_region; pub(crate) mod seq_scan; -pub(crate) mod stream; use std::sync::Arc; diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/projection.rs similarity index 84% rename from src/mito2/src/read/stream.rs rename to src/mito2/src/read/projection.rs index 310311d3c916..4f61affdc2f4 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/projection.rs @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Record batch stream. +//! Utilities for projection. -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::RecordBatch; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; -use futures::Stream; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; @@ -35,39 +32,6 @@ use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; -/// Record batch stream implementation. -pub(crate) struct StreamImpl { - /// [RecordBatch] stream. - stream: S, - /// Schema of returned record batches. - schema: SchemaRef, -} - -impl StreamImpl { - /// Returns a new stream from a record batch stream and its schema. - pub(crate) fn new(stream: S, schema: SchemaRef) -> StreamImpl { - StreamImpl { stream, schema } - } -} - -impl> + Unpin> Stream - for StreamImpl -{ - type Item = common_recordbatch::error::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -impl> + Unpin> RecordBatchStream - for StreamImpl -{ - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - /// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. pub(crate) struct ProjectionMapper { /// Maps column in [RecordBatch] to index in [Batch]. @@ -210,7 +174,8 @@ fn new_repeated_vector( .try_push_value_ref(value) .map_err(BoxedError::new) .context(ExternalSnafu)?; - // This requires an additional allocation. TODO(yingwen): Add a way to create repeated vector to data type. + // This requires an additional allocation. + // TODO(yingwen): Add a way to create repeated vector to data type. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5d277913f102..a959e0ed39b3 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -23,8 +23,8 @@ use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::error::{BuildPredicateSnafu, Result}; +use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; -use crate::read::stream::ProjectionMapper; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 2da30b288384..6ac8e37d3ec0 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_time::range::TimestampRange; use object_store::ObjectStore; use snafu::ResultExt; @@ -29,7 +29,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::memtable::MemtableRef; use crate::read::merge::MergeReaderBuilder; -use crate::read::stream::{ProjectionMapper, StreamImpl}; +use crate::read::projection::ProjectionMapper; use crate::read::BatchReader; use crate::sst::file::FileHandle; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -136,9 +136,9 @@ impl SeqScan { yield mapper.convert(&batch)?; } }; - let stream = Box::pin(StreamImpl::new( - Box::pin(stream), + let stream = Box::pin(RecordBatchStreamAdaptor::new( self.mapper.output_schema(), + Box::pin(stream), )); Ok(stream) From 8d8ccb9656857c0e826295234e4322b84ba4cae4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 28 Aug 2023 14:49:36 +0800 Subject: [PATCH 19/19] chore: Update src/mito2/src/read/scan_region.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- src/mito2/src/read/scan_region.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a959e0ed39b3..db6414bf8dd6 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -114,7 +114,7 @@ impl ScanRegion { self.seq_scan().map(Scanner::Seq) } - /// Scan sequentailly. + /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { let time_range = self.build_time_range_predicate();