Skip to content

Commit

Permalink
create data region
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Nov 3, 2023
1 parent 41391a0 commit 376b1ff
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 17 deletions.
123 changes: 108 additions & 15 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use snafu::{ensure, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest};

use crate::error::{CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, Result};
Expand All @@ -51,6 +52,7 @@ pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2;

/// Column name of internal column `__metric_name` that stores the original metric name
pub const DATA_SCHEMA_METRIC_NAME_COLUMN_NAME: &str = "__metric_name";
pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid";

pub const METADATA_REGION_SUBDIR: &str = "metadata";
pub const DATA_REGION_SUBDIR: &str = "data";
Expand Down Expand Up @@ -149,6 +151,7 @@ struct MetricEngineInner {
}

impl MetricEngineInner {
/// Initialize a metric region at given region id.
pub async fn create_region(
&self,
region_id: RegionId,
Expand All @@ -157,19 +160,10 @@ impl MetricEngineInner {
Self::verify_region_create_request(&request)?;

let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
let create_data_region_request = self.create_request_for_data_region(&request);

// create metadata region
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);

// self.mito
// .handle_request(
// data_region_id,
// RegionRequest::Create(create_data_region_request),
// )
// .await
// .with_context(|_| CreateMitoRegionSnafu {
// region_type: DATA_REGION_SUBDIR,
// })?;
self.mito
.handle_request(
metadata_region_id,
Expand All @@ -180,6 +174,18 @@ impl MetricEngineInner {
region_type: METADATA_REGION_SUBDIR,
})?;

// create data region
let create_data_region_request = self.create_request_for_data_region(&request);
self.mito
.handle_request(
data_region_id,
RegionRequest::Create(create_data_region_request),
)
.await
.with_context(|_| CreateMitoRegionSnafu {
region_type: DATA_REGION_SUBDIR,
})?;

Ok(())
}

Expand All @@ -200,6 +206,12 @@ impl MetricEngineInner {
column: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
}
);
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_TSID_COLUMN_NAME,
}
);

Ok(())
}
Expand Down Expand Up @@ -253,6 +265,7 @@ impl MetricEngineInner {
),
};

// concat region dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);

RegionCreateRequest {
Expand All @@ -268,7 +281,12 @@ impl MetricEngineInner {
}
}

// todo: register "tag columns" to metadata
/// Convert [RegionCreateRequest] for data region.
///
/// All tag columns in the original request will be converted to value columns.
/// Those columns real semantic type is stored in metadat region.
///
/// This will also add internal columns to the request.
pub fn create_request_for_data_region(
&self,
request: &RegionCreateRequest,
Expand All @@ -278,9 +296,39 @@ impl MetricEngineInner {
// concat region dir
data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);

// todo: change semantic type and primary key

// todo: add internal column
// convert semantic type
data_region_request
.column_metadatas
.iter_mut()
.for_each(|metadata| {
if metadata.semantic_type == SemanticType::Tag {
metadata.semantic_type = SemanticType::Field;
}
});

// add internal columns
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::metric_name(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
false,
),
};
let tsid_col = ColumnMetadata {
column_id: ReservedColumnId::tsid(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::int64_datatype(),
false,
),
};
data_region_request.column_metadatas.push(metric_name_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()];

data_region_request
}
Expand All @@ -289,6 +337,7 @@ impl MetricEngineInner {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::TestEnv;

#[test]
fn test_verify_region_create_request() {
Expand Down Expand Up @@ -356,4 +405,48 @@ mod tests {
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_ok());
}

#[tokio::test]
async fn test_create_request_for_data_region() {
let request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag",
ConcreteDataType::string_datatype(),
false,
),
},
],
primary_key: vec![0],
options: HashMap::new(),
region_dir: "test_dir".to_string(),
};

let env = TestEnv::new().await;
let engine = MetricEngineInner { mito: env.mito() };
let data_region_request = engine.create_request_for_data_region(&request);

assert_eq!(
data_region_request.region_dir,
"/test_dir/data/".to_string()
);
assert_eq!(data_region_request.column_metadatas.len(), 4);
assert_eq!(
data_region_request.primary_key,
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]
);
}
}
23 changes: 21 additions & 2 deletions src/metric-engine/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
use std::collections::HashMap;

use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -66,7 +70,15 @@ impl TestEnv {
let region_id = self.default_region_id();
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![],
column_metadatas: vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
}],
primary_key: vec![],
options: HashMap::new(),
region_dir: "test_metric_region".to_string(),
Expand All @@ -93,7 +105,7 @@ impl TestEnv {
mod test {

use super::*;
use crate::engine::METADATA_REGION_SUBDIR;
use crate::engine::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use crate::utils::{self, to_metadata_region_id};

#[tokio::test]
Expand All @@ -110,8 +122,15 @@ mod test {
let exist = tokio::fs::try_exists(metadata_region_dir).await.unwrap();
assert!(exist);

// assert data region's dir
let data_region_dir = join_dir(&region_dir, DATA_REGION_SUBDIR);
let exist = tokio::fs::try_exists(data_region_dir).await.unwrap();
assert!(exist);

// check mito engine
let metadata_region_id = utils::to_metadata_region_id(region_id);
let _ = env.mito().get_metadata(metadata_region_id).await.unwrap();
let data_region_id = utils::to_data_region_id(region_id);
let _ = env.mito().get_metadata(data_region_id).await.unwrap();
}
}
16 changes: 16 additions & 0 deletions src/store-api/src/storage/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ enum ReservedColumnType {
Version = 0,
Sequence,
OpType,
Tsid,
MetricName,
}

/// Column id reserved by the engine.
Expand Down Expand Up @@ -66,6 +68,20 @@ impl ReservedColumnId {
pub const fn op_type() -> ColumnId {
Self::BASE | ReservedColumnType::OpType as ColumnId
}

/// Id for storing TSID column.
///
/// Used by: metric engine
pub const fn tsid() -> ColumnId {
Self::BASE | ReservedColumnType::Tsid as ColumnId
}

/// Id for storing metric name column.
///
/// Used by: metric engine
pub const fn metric_name() -> ColumnId {
Self::BASE | ReservedColumnType::MetricName as ColumnId
}
}

// -----------------------------------------------------------------------------
Expand Down

0 comments on commit 376b1ff

Please sign in to comment.