Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Skeleton for scanning a region #2230

Merged
merged 20 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ mod test {
use snafu::IntoError;

use super::*;
use crate::error::Error;
use crate::RecordBatches;

#[tokio::test]
Expand Down Expand Up @@ -354,20 +355,24 @@ mod test {
.into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Unknown",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);

let failed_to_init_stream =
new_future_stream(Err(error::ExternalSnafu
.into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Internal",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);
}
}
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display("External error, source: {}", source))]
#[snafu(display("External error, location: {}, source: {}", location, source))]
External {
location: Location,
source: BoxedError,
Expand Down
39 changes: 34 additions & 5 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ use std::sync::Arc;

use common_query::Output;
use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::read::scan_region::ScanRegion;
use crate::read::Scanner;
use crate::request::{RegionTask, RequestBody};
use crate::worker::WorkerGroup;

Expand Down Expand Up @@ -73,12 +75,19 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}
}

/// Inner struct of [MitoEngine].
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Shared object store of all regions.
object_store: ObjectStore,
}

impl EngineInner {
Expand All @@ -89,7 +98,8 @@ impl EngineInner {
object_store: ObjectStore,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store),
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}

Expand All @@ -99,12 +109,31 @@ impl EngineInner {
}

// TODO(yingwen): return `Output` instead of `Result<()>`.
/// Handles [RequestBody] and return its executed result.
/// Handles [RegionRequest] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> {
// We validate and then convert the `request` into an inner `RequestBody` for ease of handling.
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(
version,
region.region_dir.clone(),
self.object_store.clone(),
request,
);

scan_region.scanner()
}
}
14 changes: 11 additions & 3 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,19 @@ pub enum Error {
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },

#[snafu(display("Invalid scheduler state location: {}", location,))]
#[snafu(display("Invalid scheduler state, location: {}", location))]
InvalidSchedulerState { location: Location },

#[snafu(display("Failed to stop scheduler, source: {}", source))]
StopScheduler {
source: JoinError,
StopScheduler { source: JoinError },
evenyag marked this conversation as resolved.
Show resolved Hide resolved

#[snafu(display(
"Failed to build scan predicate, location: {}, source: {}",
location,
source
))]
BuildPredicate {
source: table::error::Error,
location: Location,
},
}
Expand Down Expand Up @@ -444,6 +451,7 @@ impl ErrorExt for Error {
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::memtable::MemtableRef;
#[derive(Debug)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
mutable: MemtableRef,
pub(crate) mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableRef>,
pub(crate) immutables: Vec<MemtableRef>,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
Expand All @@ -38,8 +38,11 @@ impl MemtableVersion {
}
}

/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
let mut memtables = Vec::with_capacity(self.immutables.len() + 1);
memtables.push(self.mutable.clone());
memtables.extend_from_slice(&self.immutables);
memtables
}
}
4 changes: 4 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
//! Common structs and utilities for reading data.

pub mod merge;
pub(crate) mod scan_region;
pub mod seq_scan;
pub(crate) mod stream;

use std::sync::Arc;

Expand All @@ -37,6 +40,7 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
pub use crate::read::scan_region::Scanner;

/// Storage internal representation of a batch of rows
/// for a primary key (time series).
Expand Down
187 changes: 187 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Scans a region according to the scan request.

use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::error::{BuildPredicateSnafu, Result};
use crate::read::seq_scan::SeqScan;
use crate::read::stream::ProjectionMapper;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub enum Scanner {
/// Sequential scan.
Seq(SeqScan),
// TODO(yingwen): Support windowed scan and chained scan.
}

impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results.
pub async fn scan(&self) -> Result<SendableRecordBatchStream> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build().await,
}
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Helper to scans a region by [ScanRequest].
///
/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
///
/// ```mermaid
/// classDiagram
/// class ScanRegion {
/// -VersionRef version
/// -ScanRequest request
/// ~scanner() Scanner
/// ~seq_scan() SeqScan
/// }
/// class Scanner {
/// <<enumeration>>
/// SeqScan
/// +scan() SendableRecordBatchStream
/// }
/// class SeqScan {
/// -ProjectionMapper mapper
/// -Option~TimeRange~ time_range
/// -Option~Predicate~ predicate
/// -Vec~MemtableRef~ memtables
/// -Vec~FileHandle~ files
/// +build() SendableRecordBatchStream
/// }
/// class ProjectionMapper {
/// ~output_schema() SchemaRef
/// ~convert(Batch) RecordBatch
/// }
/// ScanRegion -- Scanner
/// ScanRegion o-- ScanRequest
/// Scanner o-- SeqScan
/// Scanner -- SendableRecordBatchStream
/// SeqScan o-- ProjectionMapper
/// SeqScan -- SendableRecordBatchStream
/// ```
pub(crate) struct ScanRegion {
/// Version of the region at scan.
version: VersionRef,
/// Directory of SST files.
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Scan request.
request: ScanRequest,
}

impl ScanRegion {
/// Creates a [ScanRegion].
pub(crate) fn new(
version: VersionRef,
file_dir: String,
object_store: ObjectStore,
request: ScanRequest,
) -> ScanRegion {
ScanRegion {
version,
file_dir,
object_store,
request,
}
}

/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
self.seq_scan().map(Scanner::Seq)
}

/// Scan sequentailly.
evenyag marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let time_range = self.build_time_range_predicate();

let ssts = &self.version.ssts;
let mut total_ssts = 0;
let mut files = Vec::new();
for level in ssts.levels() {
total_ssts += level.files.len();

for file in level.files.values() {
// Finds SST files in range.
if file_in_range(file, &time_range) {
files.push(file.clone());
}
}
}

let memtables = self.version.memtables.list_memtables();

debug!(
"Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
self.version.metadata.region_id,
memtables.len(),
files.len(),
total_ssts
);

let predicate = Predicate::try_new(
self.request.filters.clone(),
self.version.metadata.schema.clone(),
)
.context(BuildPredicateSnafu)?;
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};

let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)
.with_files(files);

Ok(seq_scan)
}

/// Build time range predicate from filters.
fn build_time_range_predicate(&self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
.column_schema
.data_type
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
waynexia marked this conversation as resolved.
Show resolved Hide resolved
.build()
}
}

/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
return true;
}
// end timestamp of a SST is inclusive.
let (start, end) = file.time_range();
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(predicate)
}
Loading