Skip to content

Commit

Permalink
feat: handle scan request on metric engine (#2793)
Browse files Browse the repository at this point in the history
* basic impl

Signed-off-by: Ruihang Xia <[email protected]>

* read/write tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy lints

Signed-off-by: Ruihang Xia <[email protected]>

* fix compile error

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy lints

Signed-off-by: Ruihang Xia <[email protected]>

* apply review sugg

Signed-off-by: Ruihang Xia <[email protected]>

* do not filter out internal column

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy lints

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Nov 24, 2023
1 parent ff8ab67 commit 33566ea
Show file tree
Hide file tree
Showing 11 changed files with 560 additions and 59 deletions.
22 changes: 21 additions & 1 deletion src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;

use crate::error::{
Expand Down Expand Up @@ -73,7 +74,13 @@ impl DataRegion {
let new_column_id_start = 1 + region_metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.filter_map(|c| {
if ReservedColumnId::is_reserved(c.column_id) {
None
} else {
Some(c.column_id)
}
})
.max()
.unwrap_or(0);

Expand Down Expand Up @@ -137,6 +144,19 @@ impl DataRegion {
.await
.context(MitoWriteOperationSnafu)
}

pub async fn physical_columns(
&self,
physical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata = self
.mito
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
Ok(metadata.column_metadatas.clone())
}
}

#[cfg(test)]
Expand Down
26 changes: 12 additions & 14 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
mod alter;
mod create;
mod put;
mod read;
mod region_metadata;
mod state;

use std::sync::Arc;
Expand Down Expand Up @@ -74,7 +76,7 @@ use crate::metadata_region::MetadataRegion;
/// they support are different. List below:
///
/// | Operations | Logical Region | Physical Region |
/// | :--------: | :------------: | :-------------: |
/// | ---------- | -------------- | --------------- |
/// | Create | ✅ | ✅ |
/// | Drop | ✅ | ❌ |
/// | Write | ✅ | ❌ |
Expand Down Expand Up @@ -113,7 +115,7 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
) -> Result<Output, BoxedError> {
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
Expand Down Expand Up @@ -143,15 +145,15 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
todo!()
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}

/// Retrieves region's metadata.
async fn get_metadata(
&self,
region_id: RegionId,
) -> std::result::Result<RegionMetadataRef, BoxedError> {
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
todo!()
}

Expand All @@ -161,15 +163,11 @@ impl RegionEngine for MetricEngine {
}

/// Stops the engine
async fn stop(&self) -> std::result::Result<(), BoxedError> {
async fn stop(&self) -> Result<(), BoxedError> {
todo!()
}

fn set_writable(
&self,
region_id: RegionId,
writable: bool,
) -> std::result::Result<(), BoxedError> {
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> {
todo!()
}

Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl MetricEngineInner {
) -> Result<()> {
let physical_region_id = {
let state = &self.state.read().await;
*state.logical_regions().get(&region_id).with_context(|| {
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
})?
Expand Down
68 changes: 68 additions & 0 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ impl MetricEngineInner {
#[cfg(test)]
mod tests {

use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::ScanRequest;

use super::*;
use crate::test_util::{self, TestEnv};
Expand All @@ -222,6 +224,72 @@ mod tests {
async fn test_write_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;

// prepare data
let schema = test_util::row_schema_with_tags(&["job"]);
let rows = test_util::build_rows(1, 5);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});

// write data
let logical_region_id = env.default_logical_region_id();
let Output::AffectedRows(count) = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap()
else {
panic!()
};
assert_eq!(count, 5);

// read data from physical region
let physical_region_id = env.default_physical_region_id();
let request = ScanRequest::default();
let stream = env
.metric()
.handle_query(physical_region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------------------------+----------------+------------+---------------------+-------+
| greptime_timestamp | greptime_value | __table_id | __tsid | job |
+-------------------------+----------------+------------+---------------------+-------+
| 1970-01-01T00:00:00 | 0.0 | 3 | 4844750677434873907 | tag_0 |
| 1970-01-01T00:00:00.001 | 1.0 | 3 | 4844750677434873907 | tag_0 |
| 1970-01-01T00:00:00.002 | 2.0 | 3 | 4844750677434873907 | tag_0 |
| 1970-01-01T00:00:00.003 | 3.0 | 3 | 4844750677434873907 | tag_0 |
| 1970-01-01T00:00:00.004 | 4.0 | 3 | 4844750677434873907 | tag_0 |
+-------------------------+----------------+------------+---------------------+-------+";
assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");

// read data from logical region
let request = ScanRequest::default();
let stream = env
.metric()
.handle_query(logical_region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------------------------+----------------+-------+
| greptime_timestamp | greptime_value | job |
+-------------------------+----------------+-------+
| 1970-01-01T00:00:00 | 0.0 | tag_0 |
| 1970-01-01T00:00:00.001 | 1.0 | tag_0 |
| 1970-01-01T00:00:00.002 | 2.0 | tag_0 |
| 1970-01-01T00:00:00.003 | 3.0 | tag_0 |
| 1970-01-01T00:00:00.004 | 4.0 | tag_0 |
+-------------------------+----------------+-------+";
assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
}

#[tokio::test]
async fn test_write_logical_region_row_count() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();

// add columns
Expand Down
Loading

0 comments on commit 33566ea

Please sign in to comment.