Skip to content

Commit

Permalink
feat(mito): Skeleton for scanning a region (GreptimeTeam#2230)
Browse files Browse the repository at this point in the history
* feat: define stream builder

* feat: scan region wip

* feat: create SeqScan in ScanRegion

* feat: scanner

* feat: engine handles scan request

* feat: map projection index to column id

* feat: Impl record batch stream

* refactor: change BatchConverter to ProjectionMapper

* feat: add column_ids to mapper

* feat: implement SeqScan::build()

* chore: fix typo

* docs: add mermaid for ScanRegion

* style: fix clippy

* test: fix record batch test

* fix: update sequence and entry id

* test: test query

* feat: address CR comment

* chore: address CR comments

* chore: Update src/mito2/src/read/scan_region.rs

Co-authored-by: Lei, HUANG <[email protected]>

---------

Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent 35dc787 commit 193b539
Show file tree
Hide file tree
Showing 22 changed files with 719 additions and 38 deletions.
21 changes: 13 additions & 8 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ mod test {
use snafu::IntoError;

use super::*;
use crate::error::Error;
use crate::RecordBatches;

#[tokio::test]
Expand Down Expand Up @@ -354,20 +355,24 @@ mod test {
.into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Unknown",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);

let failed_to_init_stream =
new_future_stream(Err(error::ExternalSnafu
.into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Internal",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);
}
}
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display("External error, source: {}", source))]
#[snafu(display("External error, location: {}, source: {}", location, source))]
External {
location: Location,
source: BoxedError,
Expand Down
21 changes: 17 additions & 4 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,26 @@ impl Stream for SimpleRecordBatchStream {
}

/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
pub struct RecordBatchStreamAdaptor {
pub struct RecordBatchStreamAdaptor<S> {
pub schema: SchemaRef,
pub stream: Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
}

impl RecordBatchStream for RecordBatchStreamAdaptor {
impl<S> RecordBatchStreamAdaptor<S> {
/// Creates a RecordBatchStreamAdaptor without output ordering requirement.
pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor<S> {
RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
}
}
}

impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
for RecordBatchStreamAdaptor<S>
{
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand All @@ -218,7 +231,7 @@ impl RecordBatchStream for RecordBatchStreamAdaptor {
}
}

impl Stream for RecordBatchStreamAdaptor {
impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamAdaptor<S> {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
36 changes: 32 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use std::sync::Arc;

use common_query::Output;
use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionTask, RequestBody};
use crate::worker::WorkerGroup;

Expand Down Expand Up @@ -72,12 +73,19 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}
}

/// Inner struct of [MitoEngine].
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Shared object store of all regions.
object_store: ObjectStore,
}

impl EngineInner {
Expand All @@ -88,7 +96,8 @@ impl EngineInner {
object_store: ObjectStore,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store),
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}

Expand All @@ -99,10 +108,29 @@ impl EngineInner {

/// Handles [RequestBody] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<Output> {
// We validate and then convert the `request` into an inner `RequestBody` for ease of handling.
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(
version,
region.region_dir.clone(),
self.object_store.clone(),
request,
);

scan_region.scanner()
}
}
47 changes: 46 additions & 1 deletion src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows};
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -246,7 +247,7 @@ fn build_rows(num_rows: usize) -> Vec<Row> {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(i as i64)),
value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)),
},
],
})
Expand Down Expand Up @@ -285,3 +286,47 @@ async fn test_write_to_region() {
};
assert_eq!(num_rows, rows_inserted);
}

// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests
// for series with multiple points and other cases.
#[tokio::test]
async fn test_write_query_region() {
let env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(3),
};
engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();

let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
15 changes: 13 additions & 2 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,24 @@ pub enum Error {
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },

#[snafu(display("Invalid scheduler state location: {}", location,))]
#[snafu(display("Invalid scheduler state, location: {}", location))]
InvalidSchedulerState { location: Location },

#[snafu(display("Failed to stop scheduler, source: {}", source))]
#[snafu(display("Failed to stop scheduler, location: {}, source: {}", location, source))]
StopScheduler {
source: JoinError,
location: Location,
},

#[snafu(display(
"Failed to build scan predicate, location: {}, source: {}",
location,
source
))]
BuildPredicate {
source: table::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -437,6 +447,7 @@ impl ErrorExt for Error {
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod error;
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
#[allow(dead_code)]
pub mod read;
#[allow(dead_code)]
mod region;
Expand Down
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::memtable::MemtableRef;
#[derive(Debug)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
mutable: MemtableRef,
pub(crate) mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableRef>,
}
Expand All @@ -38,8 +38,11 @@ impl MemtableVersion {
}
}

/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
let mut memtables = Vec::with_capacity(self.immutables.len() + 1);
memtables.push(self.mutable.clone());
memtables.extend_from_slice(&self.immutables);
memtables
}
}
3 changes: 3 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
//! Common structs and utilities for reading data.
pub mod merge;
pub(crate) mod projection;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;

use std::sync::Arc;

Expand Down
Loading

0 comments on commit 193b539

Please sign in to comment.