diff --git a/Cargo.lock b/Cargo.lock index b128e2cf9220..836d461a8de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4760,6 +4760,8 @@ dependencies = [ "common-macro", "common-query", "common-recordbatch", + "common-telemetry", + "common-test-util", "common-time", "datatypes", "mito2", @@ -4767,6 +4769,7 @@ dependencies = [ "serde_json", "snafu", "store-api", + "tokio", ] [[package]] diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 0ff5493a714a..46c76fd79b3a 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -12,10 +12,16 @@ common-error.workspace = true common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-telemetry.workspace = true common-time.workspace = true datatypes.workspace = true -mito2 = { workspace = true, features = ["test"] } +mito2.workspace = true object-store.workspace = true serde_json.workspace = true snafu.workspace = true store-api.workspace = true +tokio.workspace = true + +[dev-dependencies] +common-test-util.workspace = true +mito2 = { workspace = true, features = ["test"] } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index e331d9684686..456ded4d4bfe 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -48,6 +48,8 @@ const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "val"; const METADATA_REGION_SUBDIR: &str = "metadata"; const DATA_REGION_SUBDIR: &str = "data"; +pub const METRIC_ENGINE_NAME: &str = "metric"; + pub struct MetricEngine { inner: Arc, } @@ -56,7 +58,7 @@ pub struct MetricEngine { impl RegionEngine for MetricEngine { /// Name of this engine fn name(&self) -> &str { - "metric" + METRIC_ENGINE_NAME } /// Handles request to the region. @@ -67,10 +69,14 @@ impl RegionEngine for MetricEngine { region_id: RegionId, request: RegionRequest, ) -> std::result::Result { - match request { + let result = match request { RegionRequest::Put(_) => todo!(), RegionRequest::Delete(_) => todo!(), - RegionRequest::Create(create) => todo!(), + RegionRequest::Create(create) => self + .inner + .create_region(region_id, create) + .await + .map(|_| Output::AffectedRows(0)), RegionRequest::Drop(_) => todo!(), RegionRequest::Open(_) => todo!(), RegionRequest::Close(_) => todo!(), @@ -78,8 +84,9 @@ impl RegionEngine for MetricEngine { RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), - } - todo!() + }; + + result.map_err(BoxedError::new) } /// Handles substrait query and return a stream of record batches @@ -139,15 +146,15 @@ impl MetricEngineInner { let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); - self.mito - .handle_request( - data_region_id, - RegionRequest::Create(create_data_region_request), - ) - .await - .with_context(|_| CreateMitoRegionSnafu { - region_type: DATA_REGION_SUBDIR, - })?; + // self.mito + // .handle_request( + // data_region_id, + // RegionRequest::Create(create_data_region_request), + // ) + // .await + // .with_context(|_| CreateMitoRegionSnafu { + // region_type: DATA_REGION_SUBDIR, + // })?; self.mito .handle_request( metadata_region_id, @@ -194,7 +201,7 @@ impl MetricEngineInner { semantic_type: SemanticType::Timestamp, column_schema: ColumnSchema::new( METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, - ConcreteDataType::time_millisecond_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), false, ) .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value( @@ -255,3 +262,56 @@ impl MetricEngineInner { data_region_request } } + +#[cfg(test)] +mod test { + use std::time::Duration; + + use common_telemetry::info; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn create_metadata_region() { + common_telemetry::init_default_ut_logging(); + + let env = TestEnv::new().await; + let mito = env.mito(); + let engine = MetricEngine { + inner: Arc::new(MetricEngineInner { mito }), + }; + let engine_dir = env.data_home(); + info!("[DEBUG] {engine_dir}"); + let region_dir = join_dir(&engine_dir, "test_metric_region"); + + let region_id = RegionId::new(1, 2); + let region_create_request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![], + primary_key: vec![], + options: HashMap::new(), + region_dir: "test_metric_region".to_string(), + }; + + // create the region + engine + .handle_request(region_id, RegionRequest::Create(region_create_request)) + .await + .unwrap(); + + let mut ls_result = tokio::fs::read_dir(&engine_dir).await.unwrap(); + while let Some(dir) = ls_result.next_entry().await.unwrap() { + info!("[DEBUG] {dir:?}"); + } + + // assert metadata region's dir + let metadata_region_dir = join_dir(®ion_dir, METADATA_REGION_SUBDIR); + let exist = tokio::fs::try_exists(region_dir).await.unwrap(); + assert!(exist); + + // check mito engine + let metadata_region_id = utils::to_metadata_region_id(region_id); + let result = env.mito().get_metadata(metadata_region_id).await.unwrap(); + } +} diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index c91908c1f64f..14d9aa22bb6c 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -17,4 +17,6 @@ pub mod engine; pub mod error; #[allow(unused)] mod metadata_region; +#[cfg(test)] +mod test_util; mod utils; diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs new file mode 100644 index 000000000000..313a2ceba881 --- /dev/null +++ b/src/metric-engine/src/test_util.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing. + +use mito2::config::MitoConfig; +use mito2::engine::MitoEngine; +use mito2::test_util::TestEnv as MitoTestEnv; +use object_store::util::join_dir; + +/// Env to test metric engine. +pub struct TestEnv { + mito_env: MitoTestEnv, + mito: MitoEngine, +} + +impl TestEnv { + /// Returns a new env with empty prefix for test. + pub async fn new() -> Self { + Self::with_prefix("").await + } + + /// Returns a new env with specific `prefix` for test. + pub async fn with_prefix(prefix: &str) -> Self { + let mut mito_env = MitoTestEnv::with_prefix(prefix); + let mito = mito_env.create_engine(MitoConfig::default()).await; + Self { mito_env, mito } + } + + pub fn data_home(&self) -> String { + let env_root = self.mito_env.data_home().to_string_lossy().to_string(); + join_dir(&env_root, "data") + } + + /// Returns a reference to the engine. + pub fn mito(&self) -> MitoEngine { + self.mito.clone() + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index b87fd12039c6..7d49bb2348ad 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -20,6 +20,7 @@ pub mod scheduler_util; pub mod version_util; use std::collections::HashMap; +use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -119,6 +120,10 @@ impl TestEnv { .map(|manager| manager.default_object_store().clone()) } + pub fn data_home(&self) -> &Path { + self.data_home.path() + } + /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;