Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Skeleton for scanning a region #2230

Merged
merged 20 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ mod test {
use snafu::IntoError;

use super::*;
use crate::error::Error;
use crate::RecordBatches;

#[tokio::test]
Expand Down Expand Up @@ -354,20 +355,24 @@ mod test {
.into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
]));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Unknown",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);

let failed_to_init_stream =
new_future_stream(Err(error::ExternalSnafu
.into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
let result = RecordBatches::try_collect(Box::pin(adapter)).await;
assert_eq!(
result.unwrap_err().to_string(),
"External error, source: Internal",
let err = RecordBatches::try_collect(Box::pin(adapter))
.await
.unwrap_err();
assert!(
matches!(err, Error::External { .. }),
"unexpected err {err}"
);
}
}
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display("External error, source: {}", source))]
#[snafu(display("External error, location: {}, source: {}", location, source))]
External {
location: Location,
source: BoxedError,
Expand Down
21 changes: 17 additions & 4 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,26 @@ impl Stream for SimpleRecordBatchStream {
}

/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
pub struct RecordBatchStreamAdaptor {
pub struct RecordBatchStreamAdaptor<S> {
pub schema: SchemaRef,
pub stream: Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
}

impl RecordBatchStream for RecordBatchStreamAdaptor {
impl<S> RecordBatchStreamAdaptor<S> {
/// Creates a RecordBatchStreamAdaptor without output ordering requirement.
pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor<S> {
RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
}
}
}

impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
for RecordBatchStreamAdaptor<S>
{
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand All @@ -218,7 +231,7 @@ impl RecordBatchStream for RecordBatchStreamAdaptor {
}
}

impl Stream for RecordBatchStreamAdaptor {
impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamAdaptor<S> {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
36 changes: 32 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use std::sync::Arc;

use common_query::Output;
use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionTask, RequestBody};
use crate::worker::WorkerGroup;

Expand Down Expand Up @@ -72,12 +73,19 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.inner.handle_query(region_id, request)
}
}

/// Inner struct of [MitoEngine].
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Shared object store of all regions.
object_store: ObjectStore,
}

impl EngineInner {
Expand All @@ -88,7 +96,8 @@ impl EngineInner {
object_store: ObjectStore,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store),
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}

Expand All @@ -99,10 +108,29 @@ impl EngineInner {

/// Handles [RequestBody] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<Output> {
// We validate and then convert the `request` into an inner `RequestBody` for ease of handling.
let body = RequestBody::try_from_region_request(region_id, request)?;
let (request, receiver) = RegionTask::from_request(region_id, body);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
}

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(
version,
region.region_dir.clone(),
self.object_store.clone(),
request,
);

scan_region.scanner()
}
}
47 changes: 46 additions & 1 deletion src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows};
use common_recordbatch::RecordBatches;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -246,7 +247,7 @@ fn build_rows(num_rows: usize) -> Vec<Row> {
value_data: Some(ValueData::F64Value(i as f64)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(i as i64)),
value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)),
},
],
})
Expand Down Expand Up @@ -285,3 +286,47 @@ async fn test_write_to_region() {
};
assert_eq!(num_rows, rows_inserted);
}

// TODO(yingwen): build_rows() only generate one point for each series. We need to add tests
// for series with multiple points and other cases.
#[tokio::test]
async fn test_write_query_region() {
let env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(3),
};
engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();

let request = ScanRequest::default();
let scanner = engine.handle_query(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
15 changes: 13 additions & 2 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,24 @@ pub enum Error {
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },

#[snafu(display("Invalid scheduler state location: {}", location,))]
#[snafu(display("Invalid scheduler state, location: {}", location))]
InvalidSchedulerState { location: Location },

#[snafu(display("Failed to stop scheduler, source: {}", source))]
#[snafu(display("Failed to stop scheduler, location: {}, source: {}", location, source))]
StopScheduler {
source: JoinError,
location: Location,
},

#[snafu(display(
"Failed to build scan predicate, location: {}, source: {}",
location,
source
))]
BuildPredicate {
source: table::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -437,6 +447,7 @@ impl ErrorExt for Error {
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
BuildPredicate { source, .. } => source.status_code(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod error;
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
#[allow(dead_code)]
pub mod read;
#[allow(dead_code)]
mod region;
Expand Down
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::memtable::MemtableRef;
#[derive(Debug)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
mutable: MemtableRef,
pub(crate) mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableRef>,
}
Expand All @@ -38,8 +38,11 @@ impl MemtableVersion {
}
}

/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
let mut memtables = Vec::with_capacity(self.immutables.len() + 1);
memtables.push(self.mutable.clone());
memtables.extend_from_slice(&self.immutables);
memtables
}
}
3 changes: 3 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
//! Common structs and utilities for reading data.

pub mod merge;
pub(crate) mod projection;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;

use std::sync::Arc;

Expand Down
Loading