diff --git a/Cargo.lock b/Cargo.lock index fdfabd0a6ec3..c61ea145875e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2634,6 +2634,7 @@ dependencies = [ "lazy_static", "log-store", "meta-client", + "metric-engine", "mito2", "object-store", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index 10f5a99a7b31..7b2e23c7e95e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ frontend = { path = "src/frontend" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } +metric-engine = { path = "src/metric-engine" } mito2 = { path = "src/mito2" } object-store = { path = "src/object-store" } operator = { path = "src/operator" } diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 1de1e263fae4..f50bcf5f9ad9 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -45,6 +45,7 @@ pub const INFORMATION_SCHEMA_COLUMN_STATISTICS_TABLE_ID: u32 = 7; pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; +pub const METRIC_ENGINE: &str = "metric"; pub fn default_engine() -> &'static str { MITO_ENGINE diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 062f4a7a5984..7bcf0fad7f4f 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -16,8 +16,9 @@ use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, }; -use api::v1::{ColumnDef, SemanticType}; +use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use async_trait::async_trait; +use common_catalog::consts::METRIC_ENGINE; use common_error::ext::BoxedError; use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -28,6 +29,7 @@ use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::RegionId; use strum::AsRefStr; use table::engine::TableReference; @@ -35,7 +37,7 @@ use table::metadata::{RawTableInfo, TableId}; use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; -use crate::error::{self, Result}; +use crate::error::{self, Result, TableInfoNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; @@ -122,7 +124,7 @@ impl CreateTableProcedure { Ok(Status::executing(true)) } - pub fn create_region_request_template(&self) -> Result { + pub fn new_region_request_builder(&self) -> Result { let create_table_expr = &self.creator.data.task.create_table; let column_defs = create_table_expr @@ -172,14 +174,17 @@ impl CreateTableProcedure { }) .collect::>()?; - Ok(PbCreateRegionRequest { + let template = PbCreateRegionRequest { region_id: 0, engine: create_table_expr.engine.to_string(), column_defs, primary_key, path: String::new(), options: create_table_expr.table_options.clone(), - }) + }; + + let builder = CreateRequestBuilder::new_template(self.context.clone(), template); + Ok(builder) } pub async fn on_datanode_create_regions(&mut self) -> Result { @@ -194,7 +199,7 @@ impl CreateTableProcedure { let schema = &create_table_expr.schema_name; let storage_path = region_storage_path(catalog, schema); - let request_template = self.create_region_request_template()?; + let mut request_builder = self.new_region_request_builder()?; let leaders = find_leaders(region_routes); let mut create_region_tasks = Vec::with_capacity(leaders.len()); @@ -203,17 +208,20 @@ impl CreateTableProcedure { let requester = self.context.datanode_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); - let requests = regions - .iter() - .map(|region_number| { - let region_id = RegionId::new(self.table_id(), *region_number); - - let mut create_region_request = request_template.clone(); - create_region_request.region_id = region_id.as_u64(); - create_region_request.path = storage_path.clone(); - PbRegionRequest::Create(create_region_request) - }) - .collect::>(); + let mut requests = Vec::with_capacity(regions.len()); + for region_number in regions { + let region_id = RegionId::new(self.table_id(), region_number); + + let create_region_request = request_builder + .build_one( + &self.creator.data.task.create_table, + region_id, + storage_path.clone(), + ) + .await?; + + requests.push(PbRegionRequest::Create(create_region_request)); + } for request in requests { let request = RegionRequest { @@ -371,3 +379,82 @@ impl CreateTableData { self.task.table_ref() } } + +/// Builder for [PbCreateRegionRequest]. +pub struct CreateRequestBuilder { + context: DdlContext, + template: PbCreateRegionRequest, + /// Optional. Only for metric engine. + physical_table_id: Option, +} + +impl CreateRequestBuilder { + fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self { + Self { + context, + template, + physical_table_id: None, + } + } + + pub fn template(&self) -> &PbCreateRegionRequest { + &self.template + } + + async fn build_one( + &mut self, + create_expr: &CreateTableExpr, + region_id: RegionId, + storage_path: String, + ) -> Result { + let mut request = self.template.clone(); + + request.region_id = region_id.as_u64(); + request.path = storage_path; + + if self.template.engine == METRIC_ENGINE { + self.metric_engine_hook(create_expr, region_id, &mut request) + .await?; + } + + Ok(request) + } + + async fn metric_engine_hook( + &mut self, + create_expr: &CreateTableExpr, + region_id: RegionId, + request: &mut PbCreateRegionRequest, + ) -> Result<()> { + if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) { + let table_id = if let Some(table_id) = self.physical_table_id { + table_id + } else { + let table_name_manager = self.context.table_metadata_manager.table_name_manager(); + let table_name_key = TableNameKey::new( + &create_expr.catalog_name, + &create_expr.schema_name, + physical_table_name, + ); + let table_id = table_name_manager + .get(table_name_key) + .await? + .context(TableInfoNotFoundSnafu { + table_name: physical_table_name, + })? + .table_id(); + self.physical_table_id = Some(table_id); + table_id + }; + // Concat physical table's table id and corresponding region number to get + // the physical region id. + let physical_region_id = RegionId::new(table_id, region_id.region_number()); + request.options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_region_id.as_u64().to_string(), + ); + } + + Ok(()) + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 55dd941b63ce..74faf064f9b7 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -46,6 +46,12 @@ pub enum Error { #[snafu(display("Invalid result with a txn response: {}", err_msg))] InvalidTxnResult { err_msg: String, location: Location }, + #[snafu(display("Invalid engine type: {}", engine_type))] + InvalidEngineType { + engine_type: String, + location: Location, + }, + #[snafu(display("Failed to connect to Etcd"))] ConnectEtcd { #[snafu(source)] @@ -322,7 +328,9 @@ impl ErrorExt for Error { | RenameTable { .. } | Unsupported { .. } => StatusCode::Internal, - PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments, + PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => { + StatusCode::InvalidArguments + } TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 4a51fd3a8a64..b5fe76a08d4b 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -45,6 +45,7 @@ hyper = { version = "0.14", features = ["full"] } lazy_static.workspace = true log-store.workspace = true meta-client.workspace = true +metric-engine.workspace = true mito2.workspace = true object-store.workspace = true pin-project = "1.0" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 8910f444e328..0646771459ec 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -34,6 +34,7 @@ use futures_util::future::try_join_all; use futures_util::StreamExt; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; +use metric_engine::engine::MetricEngine; use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; @@ -471,12 +472,14 @@ impl DatanodeBuilder { for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let engine: MitoEngine = MitoEngine::new( + let mito_engine: MitoEngine = MitoEngine::new( config.clone(), log_store.clone(), object_store_manager.clone(), ); - engines.push(Arc::new(engine) as _); + let metric_engine = MetricEngine::new(mito_engine.clone()); + engines.push(Arc::new(mito_engine) as _); + engines.push(Arc::new(metric_engine) as _); } RegionEngineConfig::File(config) => { let engine = FileRegionEngine::new( diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index cb3ccc0b32a3..9f513e3662d2 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -96,7 +96,7 @@ fn create_table_task() -> CreateTableTask { } #[test] -fn test_create_region_request_template() { +fn test_region_request_builder() { let procedure = CreateTableProcedure::new( 1, create_table_task(), @@ -104,7 +104,7 @@ fn test_create_region_request_template() { test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); - let template = procedure.create_region_request_template().unwrap(); + let template = procedure.new_region_request_builder().unwrap(); let expected = PbCreateRegionRequest { region_id: 0, @@ -163,7 +163,7 @@ fn test_create_region_request_template() { path: String::new(), options: HashMap::new(), }; - assert_eq!(template, expected); + assert_eq!(template.template(), &expected); } async fn new_datanode_manager( diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 65436ef0d328..35dd0ee65571 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -27,16 +27,19 @@ use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::RwLock; use self::state::MetricEngineState; -use crate::consts::METRIC_ENGINE_NAME; use crate::data_region::DataRegion; use crate::metadata_region::MetadataRegion; +/// Fixed random state for generating tsid +pub(crate) const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4); + #[cfg_attr(doc, aquamarine::aquamarine)] /// # Metric Engine /// diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index ba4873abab2d..7313106d17a8 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -24,18 +24,18 @@ use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; -use store_api::region_engine::RegionEngine; -use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; -use store_api::storage::consts::ReservedColumnId; -use store_api::storage::RegionId; - -use crate::consts::{ +use store_api::metric_engine_consts::{ DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY, }; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; +use store_api::storage::consts::ReservedColumnId; +use store_api::storage::RegionId; + use crate::engine::MetricEngineInner; use crate::error::{ ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, @@ -412,8 +412,9 @@ impl MetricEngineInner { #[cfg(test)] mod test { + use store_api::metric_engine_consts::METRIC_ENGINE_NAME; + use super::*; - use crate::consts::METRIC_ENGINE_NAME; use crate::engine::MetricEngine; use crate::test_util::TestEnv; diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 6cbc8c7b3ca1..1354b8635bde 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -19,11 +19,13 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_telemetry::{error, info}; use snafu::OptionExt; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; -use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE}; -use crate::engine::MetricEngineInner; +use crate::engine::{MetricEngineInner, RANDOM_STATE}; use crate::error::{ ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, }; diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 7d7026ba9d28..6377e1cf1f1b 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -17,11 +17,11 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr; use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::region_engine::RegionEngine; use store_api::storage::consts::ReservedColumnId; use store_api::storage::{RegionId, ScanRequest}; -use crate::consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use crate::engine::MetricEngineInner; use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result}; use crate::utils; diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 56c8317baad4..d2440aecd3b2 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -50,7 +50,6 @@ //! └─────────────────────┘ //! ``` -pub mod consts; mod data_region; #[allow(unused)] pub mod engine; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index e65a4526e690..c6e48b718f87 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -22,15 +22,15 @@ use common_recordbatch::util::collect; use datafusion::prelude::{col, lit}; use mito2::engine::MitoEngine; use snafu::ResultExt; -use store_api::region_engine::RegionEngine; -use store_api::region_request::RegionPutRequest; -use store_api::storage::{RegionId, ScanRequest}; - -use crate::consts::{ +use store_api::metric_engine_consts::{ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, }; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionPutRequest; +use store_api::storage::{RegionId, ScanRequest}; + use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu, diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 664c35f8367d..4d325753f795 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -23,13 +23,15 @@ use mito2::engine::MitoEngine; use mito2::test_util::TestEnv as MitoTestEnv; use object_store::util::join_dir; use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::{ + LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, +}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest, }; use store_api::storage::RegionId; -use crate::consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use crate::data_region::DataRegion; use crate::engine::MetricEngine; use crate::metadata_region::MetadataRegion; @@ -281,8 +283,9 @@ pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec { #[cfg(test)] mod test { + use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; + use super::*; - use crate::consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use crate::utils::{self, to_metadata_region_id}; #[tokio::test] diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs index 69b316ee36b8..183ba2eaa369 100644 --- a/src/metric-engine/src/utils.rs +++ b/src/metric-engine/src/utils.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use store_api::metric_engine_consts::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP}; use store_api::storage::RegionId; -use crate::consts::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP}; - /// Change the given [RegionId]'s region group to [METRIC_METADATA_REGION_GROUP]. pub fn to_metadata_region_id(region_id: RegionId) -> RegionId { let table_id = region_id.table_id(); diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index d70379459821..ffda779d335f 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -19,6 +19,7 @@ pub mod data_source; pub mod logstore; pub mod manifest; pub mod metadata; +pub mod metric_engine_consts; pub mod path_utils; pub mod region_engine; pub mod region_request; diff --git a/src/metric-engine/src/consts.rs b/src/store-api/src/metric_engine_consts.rs similarity index 93% rename from src/metric-engine/src/consts.rs rename to src/store-api/src/metric_engine_consts.rs index d84833eca838..d28e4d538aa3 100644 --- a/src/metric-engine/src/consts.rs +++ b/src/store-api/src/metric_engine_consts.rs @@ -14,7 +14,7 @@ //! Constants used in metric engine. -use store_api::storage::RegionGroup; +use crate::storage::RegionGroup; /// region group value for data region inside a metric region pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0; @@ -66,6 +66,3 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table"; /// ``` /// And this key will be translated to corresponding physical **REGION** id in metasrv. pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; - -/// Fixed random state for generating tsid -pub(crate) const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c3ffceec63e8..2351ae8fa069 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -25,6 +25,7 @@ use common_time::range::TimestampRange; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, RawSchema}; use serde::{Deserialize, Serialize}; +use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY}; use store_api::storage::RegionNumber; use crate::engine::TableReference; @@ -342,6 +343,8 @@ pub fn valid_table_option(key: &str) -> bool { | TTL_KEY | REGIONS_KEY | STORAGE_KEY + | PHYSICAL_TABLE_METADATA_KEY + | LOGICAL_TABLE_METADATA_KEY ) | is_supported_in_s3(key) }