From 1fab7ab75a7e6558ce7bbb7a0e48ec13f5125317 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 26 Jan 2024 10:43:57 +0800 Subject: [PATCH] feat: batch create ddl (#3194) * feat: batch ddl to region request * feat: return table ids chore: by comment chore: remove wal_options chore: create logical tables lock key feat: get metadata in procedure * chore: by comment --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/ddl.rs | 2 + .../meta/src/ddl/create_logical_tables.rs | 397 ++++++++++++++++++ src/common/meta/src/ddl/create_table.rs | 128 +----- .../meta/src/ddl/create_table_template.rs | 134 ++++++ src/common/meta/src/ddl/table_meta.rs | 9 + src/common/meta/src/ddl/utils.rs | 75 +++- src/common/meta/src/ddl_manager.rs | 131 +++++- src/common/meta/src/error.rs | 17 +- src/common/meta/src/key.rs | 135 +++++- src/common/meta/src/key/table_name.rs | 30 +- src/common/meta/src/key/table_route.rs | 6 +- src/common/meta/src/metrics.rs | 6 + src/common/meta/src/rpc/ddl.rs | 157 +++++-- src/common/meta/src/rpc/store.rs | 6 + src/meta-srv/src/procedure/tests.rs | 77 +++- src/operator/src/error.rs | 27 ++ src/operator/src/metrics.rs | 5 + src/operator/src/statement/ddl.rs | 125 +++++- src/store-api/src/region_request.rs | 230 ++++++---- 21 files changed, 1438 insertions(+), 263 deletions(-) create mode 100644 src/common/meta/src/ddl/create_logical_tables.rs create mode 100644 src/common/meta/src/ddl/create_table_template.rs diff --git a/Cargo.lock b/Cargo.lock index 28a25d1e75b6..1eeca6b5bf24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3651,7 +3651,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65b008f018395f8fa917a7d3c7883b82f309cb74#65b008f018395f8fa917a7d3c7883b82f309cb74" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=519b1d0757404c8ff1eeb2a68d29f5ade54a1752#519b1d0757404c8ff1eeb2a68d29f5ade54a1752" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index a63e0bedcca2..384c72e05a15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65b008f018395f8fa917a7d3c7883b82f309cb74" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "519b1d0757404c8ff1eeb2a68d29f5ade54a1752" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 05e076f8ae47..8ac4bb22b945 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -27,7 +27,9 @@ use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; pub mod alter_table; +pub mod create_logical_tables; pub mod create_table; +mod create_table_template; pub mod drop_table; pub mod table_meta; pub mod truncate_table; diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs new file mode 100644 index 000000000000..978de69c1ba9 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -0,0 +1,397 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::region_request::Body as PbRegionRequest; +use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader}; +use api::v1::CreateTableExpr; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; +use futures_util::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use store_api::storage::{RegionId, RegionNumber}; +use strum::AsRefStr; +use table::metadata::{RawTableInfo, TableId}; + +use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; +use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; +use crate::ddl::DdlContext; +use crate::error::{Result, TableAlreadyExistsSnafu}; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; +use crate::lock_key::{TableLock, TableNameLock}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::{metrics, ClusterId}; + +pub struct CreateLogicalTablesProcedure { + pub context: DdlContext, + pub creator: TablesCreator, +} + +impl CreateLogicalTablesProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables"; + + pub fn new( + cluster_id: ClusterId, + tasks: Vec, + physical_table_id: TableId, + context: DdlContext, + ) -> Self { + let creator = TablesCreator::new(cluster_id, tasks, physical_table_id); + Self { context, creator } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + let creator = TablesCreator { data }; + Ok(Self { context, creator }) + } + + async fn on_prepare(&mut self) -> Result { + let manager = &self.context.table_metadata_manager; + + // Sets physical region numbers + let physical_table_id = self.creator.data.physical_table_id(); + let physical_region_numbers = manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; + self.creator + .data + .set_physical_region_numbers(physical_region_numbers); + + // Checks if the tables exists + let table_name_keys = self + .creator + .data + .all_create_table_exprs() + .iter() + .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name)) + .collect::>(); + let already_exists_tables_ids = manager + .table_name_manager() + .batch_get(table_name_keys) + .await? + .iter() + .map(|x| x.map(|x| x.table_id())) + .collect::>(); + + // Sets table ids already exists + self.creator + .data + .set_table_ids_already_exists(already_exists_tables_ids); + + // If all tables do not exists, we can create them directly. + if self.creator.data.is_all_tables_not_exists() { + self.creator.data.state = CreateTablesState::DatanodeCreateRegions; + return Ok(Status::executing(true)); + } + + // Filter out the tables that already exist. + let tasks = &self.creator.data.tasks; + let mut filtered_tasks = Vec::with_capacity(tasks.len()); + for (task, table_id) in tasks + .iter() + .zip(self.creator.data.table_ids_already_exists().iter()) + { + if table_id.is_some() { + // If a table already exists, we just ignore it. + ensure!( + task.create_table.create_if_not_exists, + TableAlreadyExistsSnafu { + table_name: task.create_table.table_name.to_string(), + } + ); + continue; + } + filtered_tasks.push(task.clone()); + } + + // Resets tasks + self.creator.data.tasks = filtered_tasks; + if self.creator.data.tasks.is_empty() { + // If all tables already exist, we can skip the `DatanodeCreateRegions` stage. + self.creator.data.state = CreateTablesState::CreateMetadata; + return Ok(Status::executing(true)); + } + + self.creator.data.state = CreateTablesState::DatanodeCreateRegions; + Ok(Status::executing(true)) + } + + pub async fn on_datanode_create_regions(&mut self) -> Result { + let physical_table_id = self.creator.data.physical_table_id(); + let (_, physical_table_route) = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await?; + let region_routes = &physical_table_route.region_routes; + + self.create_regions(region_routes).await + } + + pub async fn on_create_metadata(&self) -> Result { + let manager = &self.context.table_metadata_manager; + + let physical_table_id = self.creator.data.physical_table_id(); + let tables_data = self.creator.data.all_tables_data(); + let num_tables = tables_data.len(); + + if num_tables > 0 { + manager.create_logic_tables_metadata(tables_data).await?; + } + + info!("Created {num_tables} tables metadata for physical table {physical_table_id}"); + + Ok(Status::done_with_output(self.creator.data.real_table_ids())) + } + + fn create_region_request_builder( + &self, + physical_table_id: TableId, + task: &CreateTableTask, + ) -> Result { + let create_expr = &task.create_table; + let template = build_template(create_expr)?; + Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) + } + + fn one_datanode_region_requests( + &self, + datanode: &Peer, + region_routes: &[RegionRoute], + ) -> Result { + let create_tables_data = &self.creator.data; + let tasks = &create_tables_data.tasks; + let physical_table_id = create_tables_data.physical_table_id(); + let regions = find_leader_regions(region_routes, datanode); + let mut requests = Vec::with_capacity(tasks.len() * regions.len()); + + for task in tasks { + let create_table_expr = &task.create_table; + let catalog = &create_table_expr.catalog_name; + let schema = &create_table_expr.schema_name; + let logical_table_id = task.table_info.ident.table_id; + let storage_path = region_storage_path(catalog, schema); + let request_builder = self.create_region_request_builder(physical_table_id, task)?; + + for region_number in ®ions { + let region_id = RegionId::new(logical_table_id, *region_number); + let create_region_request = + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; + requests.push(create_region_request); + } + } + + Ok(CreateRequests { requests }) + } + + async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result { + let leaders = find_leaders(region_routes); + let mut create_region_tasks = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let requester = self.context.datanode_manager.datanode(&datanode).await; + let creates = self.one_datanode_region_requests(&datanode, region_routes)?; + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(PbRegionRequest::Creates(creates)), + }; + create_region_tasks.push(async move { + if let Err(err) = requester.handle(request).await { + return Err(handle_operate_region_error(datanode)(err)); + } + Ok(()) + }); + } + + join_all(create_region_tasks) + .await + .into_iter() + .collect::>>()?; + + self.creator.data.state = CreateTablesState::CreateMetadata; + + // Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage. + Ok(Status::executing(false)) + } +} + +#[async_trait] +impl Procedure for CreateLogicalTablesProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.creator.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateTablesState::Prepare => self.on_prepare().await, + CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await, + CreateTablesState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.creator.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len()); + lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into()); + for task in &self.creator.data.tasks { + lock_key.push( + TableNameLock::new( + &task.create_table.catalog_name, + &task.create_table.schema_name, + &task.create_table.table_name, + ) + .into(), + ); + } + LockKey::new(lock_key) + } +} + +pub struct TablesCreator { + /// The serializable data. + pub data: CreateTablesData, +} + +impl TablesCreator { + pub fn new( + cluster_id: ClusterId, + tasks: Vec, + physical_table_id: TableId, + ) -> Self { + let table_ids_from_tasks = tasks + .iter() + .map(|task| task.table_info.ident.table_id) + .collect::>(); + let len = table_ids_from_tasks.len(); + Self { + data: CreateTablesData { + cluster_id, + state: CreateTablesState::Prepare, + tasks, + table_ids_from_tasks, + table_ids_already_exists: vec![None; len], + physical_table_id, + physical_region_numbers: vec![], + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateTablesData { + cluster_id: ClusterId, + state: CreateTablesState, + tasks: Vec, + table_ids_from_tasks: Vec, + // Because the table_id is allocated before entering the distributed lock, + // it needs to recheck if the table exists when creating a table. + // If it does exist, then the table_id needs to be replaced with the existing one. + table_ids_already_exists: Vec>, + physical_table_id: TableId, + physical_region_numbers: Vec, +} + +impl CreateTablesData { + pub fn state(&self) -> &CreateTablesState { + &self.state + } + + fn physical_table_id(&self) -> TableId { + self.physical_table_id + } + + fn set_physical_region_numbers(&mut self, physical_region_numbers: Vec) { + self.physical_region_numbers = physical_region_numbers; + } + + fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec>) { + self.table_ids_already_exists = table_ids_already_exists; + } + + fn table_ids_already_exists(&self) -> &[Option] { + &self.table_ids_already_exists + } + + fn is_all_tables_not_exists(&self) -> bool { + self.table_ids_already_exists.iter().all(Option::is_none) + } + + pub fn real_table_ids(&self) -> Vec { + self.table_ids_from_tasks + .iter() + .zip(self.table_ids_already_exists.iter()) + .map(|(table_id_from_task, table_id_already_exists)| { + table_id_already_exists.unwrap_or(*table_id_from_task) + }) + .collect::>() + } + + fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> { + self.tasks + .iter() + .map(|task| &task.create_table) + .collect::>() + } + + fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> { + self.tasks + .iter() + .map(|task| { + let table_info = task.table_info.clone(); + let region_ids = self + .physical_region_numbers + .iter() + .map(|region_number| RegionId::new(table_info.ident.table_id, *region_number)) + .collect(); + let table_route = TableRouteValue::logical(self.physical_table_id, region_ids); + (table_info, table_route) + }) + .collect::>() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] +pub enum CreateTablesState { + /// Prepares to create the tables + Prepare, + /// Creates regions on the Datanode + DatanodeCreateRegions, + /// Creates metadata + CreateMetadata, +} diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 848e9f2d8eed..c1c45c30911f 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -15,10 +15,7 @@ use std::collections::HashMap; use api::v1::region::region_request::Body as PbRegionRequest; -use api::v1::region::{ - CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, -}; -use api::v1::{ColumnDef, SemanticType}; +use api::v1::region::{RegionRequest, RegionRequestHeader}; use async_trait::async_trait; use common_error::ext::BoxedError; use common_procedure::error::{ @@ -30,25 +27,24 @@ use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; +use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; use crate::error::{self, Result, TableRouteNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::lock_key::TableNameLock; -use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; -use crate::wal_options_allocator::prepare_wal_options; +use crate::{metrics, ClusterId}; pub struct CreateTableProcedure { pub context: DdlContext, @@ -59,7 +55,7 @@ impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; pub fn new( - cluster_id: u64, + cluster_id: ClusterId, task: CreateTableTask, table_route: TableRouteValue, region_wal_options: HashMap, @@ -117,7 +113,7 @@ impl CreateTableProcedure { if let Some(value) = table_name_value { ensure!( - self.creator.data.task.create_table.create_if_not_exists, + expr.create_if_not_exists, error::TableAlreadyExistsSnafu { table_name: self.creator.data.table_ref().to_string(), } @@ -137,67 +133,8 @@ impl CreateTableProcedure { physical_table_id: Option, ) -> Result { let create_table_expr = &self.creator.data.task.create_table; - - let column_defs = create_table_expr - .column_defs - .iter() - .enumerate() - .map(|(i, c)| { - let semantic_type = if create_table_expr.time_index == c.name { - SemanticType::Timestamp - } else if create_table_expr.primary_keys.contains(&c.name) { - SemanticType::Tag - } else { - SemanticType::Field - }; - - RegionColumnDef { - column_def: Some(ColumnDef { - name: c.name.clone(), - data_type: c.data_type, - is_nullable: c.is_nullable, - default_constraint: c.default_constraint.clone(), - semantic_type: semantic_type as i32, - comment: String::new(), - datatype_extension: c.datatype_extension.clone(), - }), - column_id: i as u32, - } - }) - .collect::>(); - - let primary_key = create_table_expr - .primary_keys - .iter() - .map(|key| { - column_defs - .iter() - .find_map(|c| { - c.column_def.as_ref().and_then(|x| { - if &x.name == key { - Some(c.column_id) - } else { - None - } - }) - }) - .context(error::PrimaryKeyNotFoundSnafu { key }) - }) - .collect::>()?; - - let template = PbCreateRegionRequest { - region_id: 0, - engine: create_table_expr.engine.to_string(), - column_defs, - primary_key, - path: String::new(), - options: create_table_expr.table_options.clone(), - }; - - Ok(CreateRequestBuilder { - template, - physical_table_id, - }) + let template = build_template(create_table_expr)?; + Ok(CreateRequestBuilder::new(template, physical_table_id)) } pub async fn on_datanode_create_regions(&mut self) -> Result { @@ -261,9 +198,11 @@ impl CreateTableProcedure { let mut requests = Vec::with_capacity(regions.len()); for region_number in regions { let region_id = RegionId::new(self.table_id(), region_number); - let create_region_request = request_builder - .build_one(region_id, storage_path.clone(), region_wal_options) - .await?; + let create_region_request = request_builder.build_one( + region_id, + storage_path.clone(), + region_wal_options, + )?; requests.push(PbRegionRequest::Create(create_region_request)); } @@ -426,7 +365,7 @@ pub struct CreateTableData { pub task: CreateTableTask, table_route: TableRouteValue, pub region_wal_options: HashMap, - pub cluster_id: u64, + pub cluster_id: ClusterId, } impl CreateTableData { @@ -434,44 +373,3 @@ impl CreateTableData { self.task.table_ref() } } - -/// Builder for [PbCreateRegionRequest]. -pub struct CreateRequestBuilder { - template: PbCreateRegionRequest, - /// Optional. Only for metric engine. - physical_table_id: Option, -} - -impl CreateRequestBuilder { - pub fn template(&self) -> &PbCreateRegionRequest { - &self.template - } - - async fn build_one( - &self, - region_id: RegionId, - storage_path: String, - region_wal_options: &HashMap, - ) -> Result { - let mut request = self.template.clone(); - - request.region_id = region_id.as_u64(); - request.path = storage_path; - // Stores the encoded wal options into the request options. - prepare_wal_options(&mut request.options, region_id, region_wal_options); - - if let Some(physical_table_id) = self.physical_table_id { - // Logical table has the same region numbers with physical table, and they have a one-to-one mapping. - // For example, region 0 of logical table must resides with region 0 of physical table. So here we can - // simply concat the physical table id and the logical region number to get the physical region id. - let physical_region_id = RegionId::new(physical_table_id, region_id.region_number()); - - request.options.insert( - LOGICAL_TABLE_METADATA_KEY.to_string(), - physical_region_id.as_u64().to_string(), - ); - } - - Ok(request) - } -} diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table_template.rs new file mode 100644 index 000000000000..ee9165269d14 --- /dev/null +++ b/src/common/meta/src/ddl/create_table_template.rs @@ -0,0 +1,134 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::{CreateRequest, RegionColumnDef}; +use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; +use snafu::OptionExt; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::TableId; + +use crate::error; +use crate::error::Result; +use crate::wal_options_allocator::prepare_wal_options; + +pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result { + let column_defs = create_table_expr + .column_defs + .iter() + .enumerate() + .map(|(i, c)| { + let semantic_type = if create_table_expr.time_index == c.name { + SemanticType::Timestamp + } else if create_table_expr.primary_keys.contains(&c.name) { + SemanticType::Tag + } else { + SemanticType::Field + }; + + RegionColumnDef { + column_def: Some(ColumnDef { + name: c.name.clone(), + data_type: c.data_type, + is_nullable: c.is_nullable, + default_constraint: c.default_constraint.clone(), + semantic_type: semantic_type as i32, + comment: String::new(), + datatype_extension: c.datatype_extension.clone(), + }), + column_id: i as u32, + } + }) + .collect::>(); + + let primary_key = create_table_expr + .primary_keys + .iter() + .map(|key| { + column_defs + .iter() + .find_map(|c| { + c.column_def.as_ref().and_then(|x| { + if &x.name == key { + Some(c.column_id) + } else { + None + } + }) + }) + .context(error::PrimaryKeyNotFoundSnafu { key }) + }) + .collect::>()?; + + let template = CreateRequest { + region_id: 0, + engine: create_table_expr.engine.to_string(), + column_defs, + primary_key, + path: String::new(), + options: create_table_expr.table_options.clone(), + }; + + Ok(template) +} + +/// Builder for [PbCreateRegionRequest]. +pub struct CreateRequestBuilder { + template: CreateRequest, + /// Optional. Only for metric engine. + physical_table_id: Option, +} + +impl CreateRequestBuilder { + pub(crate) fn new(template: CreateRequest, physical_table_id: Option) -> Self { + Self { + template, + physical_table_id, + } + } + + pub fn template(&self) -> &CreateRequest { + &self.template + } + + pub(crate) fn build_one( + &self, + region_id: RegionId, + storage_path: String, + region_wal_options: &HashMap, + ) -> Result { + let mut request = self.template.clone(); + + request.region_id = region_id.as_u64(); + request.path = storage_path; + // Stores the encoded wal options into the request options. + prepare_wal_options(&mut request.options, region_id, region_wal_options); + + if let Some(physical_table_id) = self.physical_table_id { + // Logical table has the same region numbers with physical table, and they have a one-to-one mapping. + // For example, region 0 of logical table must resides with region 0 of physical table. So here we can + // simply concat the physical table id and the logical region number to get the physical region id. + let physical_region_id = RegionId::new(physical_table_id, region_id.region_number()); + + request.options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_region_id.as_u64().to_string(), + ); + } + + Ok(request) + } +} diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index ed256f85a585..5ade45c22f6b 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -197,6 +197,15 @@ impl TableMetadataAllocator { region_wal_options, }) } + + /// Sets table ids with all tasks. + pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> { + for task in tasks { + let table_id = self.allocate_table_id(task).await?; + task.table_info.ident.table_id = table_id; + } + Ok(()) + } } pub type PeerAllocatorRef = Arc; diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 63f669fcd20b..f93feb9b6778 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -12,21 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_catalog::consts::METRIC_ENGINE; use common_error::ext::BoxedError; use common_procedure::error::Error as ProcedureError; -use snafu::{location, Location}; +use snafu::{ensure, location, Location, OptionExt}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use table::metadata::TableId; -use crate::error::{self, Error}; +use crate::error::{ + EmptyCreateTableTasksSnafu, Error, Result, TableNotFoundSnafu, UnsupportedSnafu, +}; +use crate::key::table_name::TableNameKey; +use crate::key::TableMetadataManagerRef; use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; -pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(crate::error::Error) -> Error { +pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error { move |err| { - if matches!(err, crate::error::Error::RetryLater { .. }) { - error::Error::RetryLater { + if matches!(err, Error::RetryLater { .. }) { + Error::RetryLater { source: BoxedError::new(err), } } else { - error::Error::OperateDatanode { + Error::OperateDatanode { location: location!(), peer: datanode, source: BoxedError::new(err), @@ -47,3 +55,58 @@ pub fn handle_retry_error(e: Error) -> ProcedureError { pub fn region_storage_path(catalog: &str, schema: &str) -> String { format!("{}/{}", catalog, schema) } + +pub async fn check_and_get_physical_table_id( + table_metadata_manager: &TableMetadataManagerRef, + tasks: &[CreateTableTask], +) -> Result { + let mut physical_table_name = None; + for task in tasks { + ensure!( + task.create_table.engine == METRIC_ENGINE, + UnsupportedSnafu { + operation: format!("create table with engine {}", task.create_table.engine) + } + ); + let current_physical_table_name = task + .create_table + .table_options + .get(LOGICAL_TABLE_METADATA_KEY) + .context(UnsupportedSnafu { + operation: format!( + "create table without table options {}", + LOGICAL_TABLE_METADATA_KEY, + ), + })?; + let current_physical_table_name = TableNameKey::new( + &task.create_table.catalog_name, + &task.create_table.schema_name, + current_physical_table_name, + ); + + physical_table_name = match physical_table_name { + Some(name) => { + ensure!( + name == current_physical_table_name, + UnsupportedSnafu { + operation: format!( + "create table with different physical table name {} and {}", + name, current_physical_table_name + ) + } + ); + Some(name) + } + None => Some(current_physical_table_name), + }; + } + let physical_table_name = physical_table_name.context(EmptyCreateTableTasksSnafu)?; + table_metadata_manager + .table_name_manager() + .get(physical_table_name) + .await? + .context(TableNotFoundSnafu { + table_name: physical_table_name.to_string(), + }) + .map(|table| table.table_id()) +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d7c74ae51988..d741e109d97f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -18,35 +18,41 @@ use std::sync::Arc; use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_table::AlterTableProcedure; +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{ - DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext, + utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, + TableMetadataAllocatorContext, }; use crate::error::{ - self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, - WaitProcedureSnafu, + self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, + SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::region_keeper::MemoryRegionKeeperRef; -use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable}; +use crate::rpc::ddl::DdlTask::{ + AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropLogicalTables, DropTable, + TruncateTable, +}; use crate::rpc::ddl::{ AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::router::RegionRoute; use crate::table_name::TableName; +use crate::ClusterId; pub type DdlManagerRef = Arc; @@ -114,6 +120,20 @@ impl DdlManager { let context = self.create_context(); + self.procedure_manager + .register_loader( + CreateLogicalTablesProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + CreateLogicalTablesProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: CreateLogicalTablesProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + self.procedure_manager .register_loader( DropTableProcedure::TYPE_NAME, @@ -159,7 +179,7 @@ impl DdlManager { /// Submits and executes an alter table task. pub async fn submit_alter_table_task( &self, - cluster_id: u64, + cluster_id: ClusterId, alter_table_task: AlterTableTask, table_info_value: DeserializedValueWithBytes, physical_table_info: Option<(TableId, TableName)>, @@ -183,7 +203,7 @@ impl DdlManager { /// Submits and executes a create table task. pub async fn submit_create_table_task( &self, - cluster_id: u64, + cluster_id: ClusterId, create_table_task: CreateTableTask, table_route: TableRouteValue, region_wal_options: HashMap, @@ -203,11 +223,33 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a create table task. + pub async fn submit_create_logical_table_tasks( + &self, + cluster_id: ClusterId, + create_table_tasks: Vec, + physical_table_id: TableId, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + + let procedure = CreateLogicalTablesProcedure::new( + cluster_id, + create_table_tasks, + physical_table_id, + context, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a drop table task. pub async fn submit_drop_table_task( &self, - cluster_id: u64, + cluster_id: ClusterId, drop_table_task: DropTableTask, table_info_value: DeserializedValueWithBytes, table_route_value: DeserializedValueWithBytes, @@ -231,7 +273,7 @@ impl DdlManager { /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( &self, - cluster_id: u64, + cluster_id: ClusterId, truncate_table_task: TruncateTableTask, table_info_value: DeserializedValueWithBytes, region_routes: Vec, @@ -272,7 +314,7 @@ impl DdlManager { async fn handle_truncate_table_task( ddl_manager: &DdlManager, - cluster_id: u64, + cluster_id: ClusterId, truncate_table_task: TruncateTableTask, ) -> Result { let table_id = truncate_table_task.table_id; @@ -310,7 +352,7 @@ async fn handle_truncate_table_task( async fn handle_alter_table_task( ddl_manager: &DdlManager, - cluster_id: u64, + cluster_id: ClusterId, alter_table_task: AlterTableTask, ) -> Result { let table_ref = alter_table_task.table_ref(); @@ -385,7 +427,7 @@ async fn handle_alter_table_task( async fn handle_drop_table_task( ddl_manager: &DdlManager, - cluster_id: u64, + cluster_id: ClusterId, drop_table_task: DropTableTask, ) -> Result { let table_id = drop_table_task.table_id; @@ -427,7 +469,7 @@ async fn handle_drop_table_task( async fn handle_create_table_task( ddl_manager: &DdlManager, - cluster_id: u64, + cluster_id: ClusterId, mut create_table_task: CreateTableTask, ) -> Result { let table_meta = ddl_manager @@ -454,14 +496,66 @@ async fn handle_create_table_task( region_wal_options, ) .await?; - let output = output.context(error::ProcedureOutputSnafu)?; - let table_id = *(output.downcast_ref::().unwrap()); + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let table_id = *(output.downcast_ref::().context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `u32`", + })?); info!("Table: {table_id} is created via procedure_id {id:?}"); Ok(SubmitDdlTaskResponse { - key: id.to_string().into(), + key: procedure_id.into(), table_id: Some(table_id), + ..Default::default() + }) +} + +async fn handle_create_logical_table_tasks( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + mut create_table_tasks: Vec, +) -> Result { + ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu); + let physical_table_id = utils::check_and_get_physical_table_id( + &ddl_manager.table_metadata_manager, + &create_table_tasks, + ) + .await?; + // Sets table_ids on create_table_tasks + ddl_manager + .table_metadata_allocator + .set_table_ids_on_logic_create(&mut create_table_tasks) + .await?; + let num_logical_tables = create_table_tasks.len(); + + let (id, output) = ddl_manager + .submit_create_logical_table_tasks(cluster_id, create_table_tasks, physical_table_id) + .await?; + + info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"); + + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let table_ids = output + .downcast_ref::>() + .context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `Vec`", + })? + .clone(); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + table_ids, + ..Default::default() }) } @@ -494,6 +588,11 @@ impl DdlTaskExecutor for DdlManager { TruncateTable(truncate_table_task) => { handle_truncate_table_task(self, cluster_id, truncate_table_task).await } + CreateLogicalTables(create_table_tasks) => { + handle_create_logical_table_tasks(self, cluster_id, create_table_tasks).await + } + DropLogicalTables(_) => todo!(), + AlterLogicalTables(_) => todo!(), } } .trace(span) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 246a09711d9a..9bddb5c927f8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -112,15 +112,21 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display( + "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}" + ))] + ProcedureOutput { + procedure_id: String, + err_msg: String, + location: Location, + }, + #[snafu(display("Failed to convert RawTableInfo into TableInfo"))] ConvertRawTableInfo { location: Location, source: datatypes::Error, }, - #[snafu(display("Failed to get procedure output"))] - ProcedureOutput { location: Location }, - #[snafu(display("Primary key '{key}' not found when creating region request"))] PrimaryKeyNotFound { key: String, location: Location }, @@ -357,6 +363,9 @@ pub enum Error { #[snafu(display("Unexpected table route type: {}", err_msg))] UnexpectedLogicalRouteTable { location: Location, err_msg: String }, + + #[snafu(display("The tasks of create tables cannot be empty"))] + EmptyCreateTableTasks { location: Location }, } pub type Result = std::result::Result; @@ -426,7 +435,7 @@ impl ErrorExt for Error { InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), - InvalidNumTopics { .. } => StatusCode::InvalidArguments, + InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 2e9a47a1e45e..702061c0b0a1 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -84,7 +84,7 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; -use crate::kv_backend::txn::Txn; +use crate::kv_backend::txn::{Txn, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::DatanodeId; @@ -457,6 +457,86 @@ impl TableMetadataManager { Ok(()) } + /// Creates metadata for multiple logical tables and return an error if different metadata exists. + pub async fn create_logic_tables_metadata( + &self, + tables_data: Vec<(RawTableInfo, TableRouteValue)>, + ) -> Result<()> { + let len = tables_data.len(); + let mut txns = Vec::with_capacity(3 * len); + struct OnFailure + where + F1: FnOnce(&Vec) -> R1, + F2: FnOnce(&Vec) -> R2, + { + table_info_value: TableInfoValue, + on_create_table_info_failure: F1, + table_route_value: TableRouteValue, + on_create_table_route_failure: F2, + } + let mut on_failures = Vec::with_capacity(len); + for (mut table_info, table_route_value) in tables_data { + table_info.meta.region_numbers = table_route_value.region_numbers(); + let table_id = table_info.ident.table_id; + + // Creates table name. + let table_name = TableNameKey::new( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name, + ); + let create_table_name_txn = self + .table_name_manager() + .build_create_txn(&table_name, table_id)?; + txns.push(create_table_name_txn); + + // Creates table info. + let table_info_value = TableInfoValue::new(table_info); + let (create_table_info_txn, on_create_table_info_failure) = + self.table_info_manager() + .build_create_txn(table_id, &table_info_value)?; + txns.push(create_table_info_txn); + + let (create_table_route_txn, on_create_table_route_failure) = self + .table_route_manager() + .build_create_txn(table_id, &table_route_value)?; + txns.push(create_table_route_txn); + + on_failures.push(OnFailure { + table_info_value, + on_create_table_info_failure, + table_route_value, + on_create_table_route_failure, + }); + } + + let txn = Txn::merge_all(txns); + let r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already created. + if !r.succeeded { + for on_failure in on_failures { + let remote_table_info = (on_failure.on_create_table_info_failure)(&r.responses)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the create table metadata", + })? + .into_inner(); + + let remote_table_route = (on_failure.on_create_table_route_failure)(&r.responses)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty table route during the create table metadata", + })? + .into_inner(); + + let op_name = "the creating logical tables metadata"; + ensure_values!(remote_table_info, on_failure.table_info_value, op_name); + ensure_values!(remote_table_route, on_failure.table_route_value, op_name); + } + } + + Ok(()) + } + /// Deletes metadata for table. /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn delete_table_metadata( @@ -907,6 +987,59 @@ mod tests { ); } + #[tokio::test] + async fn test_create_logic_tables_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + let table_info: RawTableInfo = + new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_id = table_info.ident.table_id; + let table_route_value = TableRouteValue::physical(region_routes.clone()); + + let tables_data = vec![(table_info.clone(), table_route_value.clone())]; + // creates metadata. + table_metadata_manager + .create_logic_tables_metadata(tables_data.clone()) + .await + .unwrap(); + + // if metadata was already created, it should be ok. + assert!(table_metadata_manager + .create_logic_tables_metadata(tables_data) + .await + .is_ok()); + + let mut modified_region_routes = region_routes.clone(); + modified_region_routes.push(new_region_route(2, 3)); + let modified_table_route_value = TableRouteValue::physical(modified_region_routes.clone()); + let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)]; + // if remote metadata was exists, it should return an error. + assert!(table_metadata_manager + .create_logic_tables_metadata(modified_tables_data) + .await + .is_err()); + + let (remote_table_info, remote_table_route) = table_metadata_manager + .get_full_table_info(table_id) + .await + .unwrap(); + + assert_eq!( + remote_table_info.unwrap().into_inner().table_info, + table_info + ); + assert_eq!( + remote_table_route + .unwrap() + .into_inner() + .region_routes() + .unwrap(), + ®ion_routes + ); + } + #[tokio::test] async fn test_delete_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index e86561cbc986..eb8ca7a8c015 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use futures_util::stream::BoxStream; @@ -26,11 +27,11 @@ use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::RangeRequest; +use crate::rpc::store::{BatchGetRequest, RangeRequest}; use crate::rpc::KeyValue; use crate::table_name::TableName; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TableNameKey<'a> { pub catalog: &'a str, pub schema: &'a str, @@ -220,6 +221,31 @@ impl TableNameManager { .transpose() } + pub async fn batch_get( + &self, + keys: Vec>, + ) -> Result>> { + let raw_keys = keys + .into_iter() + .map(|key| key.as_raw_key()) + .collect::>(); + let req = BatchGetRequest::new().with_keys(raw_keys.clone()); + let res = self.kv_backend.batch_get(req).await?; + let kvs = res + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::>(); + let mut array = vec![None; raw_keys.len()]; + for (i, key) in raw_keys.into_iter().enumerate() { + let v = kvs.get(&key); + array[i] = v + .map(|v| TableNameValue::try_from_raw_value(v)) + .transpose()?; + } + Ok(array) + } + pub async fn exists(&self, key: TableNameKey<'_>) -> Result { let raw_key = key.as_raw_key(); self.kv_backend.exists(&raw_key).await diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 9c9270f22d84..270707d945c2 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -64,6 +64,10 @@ impl TableRouteValue { Self::Physical(PhysicalTableRouteValue::new(region_routes)) } + pub fn logical(physical_table_id: TableId, region_ids: Vec) -> Self { + Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids)) + } + /// Returns a new version [TableRouteValue] with `region_routes`. pub fn update(&self, region_routes: Vec) -> Result { ensure!( @@ -231,7 +235,7 @@ impl TableRouteManager { } /// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied. - pub(crate) fn build_create_txn( + pub fn build_create_txn( &self, table_id: TableId, table_route_value: &TableRouteValue, diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index f6979eb198a8..4e810195bb2b 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_tables", + "meta procedure create tables", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_DROP_TABLE: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_drop_table", "meta procedure drop table", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 2cc7ff8a0c43..f4c455a122d8 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -16,8 +16,10 @@ use std::result; use api::v1::meta::submit_ddl_task_request::Task; use api::v1::meta::{ - AlterTableTask as PbAlterTableTask, CreateTableTask as PbCreateTableTask, - DropTableTask as PbDropTableTask, Partition, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, + AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, + CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, + DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, + SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, TruncateTableTask as PbTruncateTableTask, }; use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; @@ -38,6 +40,9 @@ pub enum DdlTask { DropTable(DropTableTask), AlterTable(AlterTableTask), TruncateTable(TruncateTableTask), + CreateLogicalTables(Vec), + DropLogicalTables(Vec), + AlterLogicalTables(Vec), } impl DdlTask { @@ -49,6 +54,15 @@ impl DdlTask { DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) } + pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self { + DdlTask::CreateLogicalTables( + table_data + .into_iter() + .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info)) + .collect(), + ) + } + pub fn new_drop_table( catalog: String, schema: String, @@ -96,6 +110,33 @@ impl TryFrom for DdlTask { Task::TruncateTableTask(truncate_table) => { Ok(DdlTask::TruncateTable(truncate_table.try_into()?)) } + Task::CreateTableTasks(create_tables) => { + let tasks = create_tables + .tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Ok(DdlTask::CreateLogicalTables(tasks)) + } + Task::DropTableTasks(drop_tables) => { + let tasks = drop_tables + .tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Ok(DdlTask::DropLogicalTables(tasks)) + } + Task::AlterTableTasks(alter_tables) => { + let tasks = alter_tables + .tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Ok(DdlTask::AlterLogicalTables(tasks)) + } } } } @@ -110,31 +151,34 @@ impl TryFrom for PbSubmitDdlTaskRequest { fn try_from(request: SubmitDdlTaskRequest) -> Result { let task = match request.task { - DdlTask::CreateTable(task) => Task::CreateTableTask(PbCreateTableTask { - table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?, - create_table: Some(task.create_table), - partitions: task.partitions, - }), - DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask { - drop_table: Some(DropTableExpr { - catalog_name: task.catalog, - schema_name: task.schema, - table_name: task.table, - table_id: Some(api::v1::TableId { id: task.table_id }), - drop_if_exists: task.drop_if_exists, - }), - }), - DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask { - alter_table: Some(task.alter_table), - }), - DdlTask::TruncateTable(task) => Task::TruncateTableTask(PbTruncateTableTask { - truncate_table: Some(TruncateTableExpr { - catalog_name: task.catalog, - schema_name: task.schema, - table_name: task.table, - table_id: Some(api::v1::TableId { id: task.table_id }), - }), - }), + DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?), + DdlTask::DropTable(task) => Task::DropTableTask(task.try_into()?), + DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?), + DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?), + DdlTask::CreateLogicalTables(tasks) => { + let tasks = tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Task::CreateTableTasks(PbCreateTableTasks { tasks }) + } + DdlTask::DropLogicalTables(tasks) => { + let tasks = tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Task::DropTableTasks(PbDropTableTasks { tasks }) + } + DdlTask::AlterLogicalTables(tasks) => { + let tasks = tasks + .into_iter() + .map(|task| task.try_into()) + .collect::>>()?; + + Task::AlterTableTasks(PbAlterTableTasks { tasks }) + } }; Ok(Self { @@ -147,7 +191,11 @@ impl TryFrom for PbSubmitDdlTaskRequest { #[derive(Debug, Default)] pub struct SubmitDdlTaskResponse { pub key: Vec, + // For create physical table + // TODO(jeremy): remove it? pub table_id: Option, + // For create multi logical tables + pub table_ids: Vec, } impl TryFrom for SubmitDdlTaskResponse { @@ -155,9 +203,11 @@ impl TryFrom for SubmitDdlTaskResponse { fn try_from(resp: PbSubmitDdlTaskResponse) -> Result { let table_id = resp.table_id.map(|t| t.id); + let table_ids = resp.table_ids.iter().map(|t| t.id).collect(); Ok(Self { key: resp.key, table_id, + table_ids, }) } } @@ -225,6 +275,22 @@ impl TryFrom for DropTableTask { } } +impl TryFrom for PbDropTableTask { + type Error = error::Error; + + fn try_from(task: DropTableTask) -> Result { + Ok(PbDropTableTask { + drop_table: Some(DropTableExpr { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: Some(api::v1::TableId { id: task.table_id }), + drop_if_exists: task.drop_if_exists, + }), + }) + } +} + #[derive(Debug, PartialEq, Clone)] pub struct CreateTableTask { pub create_table: CreateTableExpr, @@ -248,6 +314,18 @@ impl TryFrom for CreateTableTask { } } +impl TryFrom for PbCreateTableTask { + type Error = error::Error; + + fn try_from(task: CreateTableTask) -> Result { + Ok(PbCreateTableTask { + table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?, + create_table: Some(task.create_table), + partitions: task.partitions, + }) + } +} + impl CreateTableTask { pub fn new( expr: CreateTableExpr, @@ -357,6 +435,16 @@ impl TryFrom for AlterTableTask { } } +impl TryFrom for PbAlterTableTask { + type Error = error::Error; + + fn try_from(task: AlterTableTask) -> Result { + Ok(PbAlterTableTask { + alter_table: Some(task.alter_table), + }) + } +} + impl Serialize for AlterTableTask { fn serialize(&self, serializer: S) -> result::Result where @@ -438,6 +526,21 @@ impl TryFrom for TruncateTableTask { } } +impl TryFrom for PbTruncateTableTask { + type Error = error::Error; + + fn try_from(task: TruncateTableTask) -> Result { + Ok(PbTruncateTableTask { + truncate_table: Some(TruncateTableExpr { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: Some(api::v1::TableId { id: task.table_id }), + }), + }) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index e4873945ab59..3156a8b29639 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -335,6 +335,12 @@ impl BatchGetRequest { Self { keys: vec![] } } + #[inline] + pub fn with_keys(mut self, keys: Vec>) -> Self { + self.keys = keys; + self + } + #[inline] pub fn add_key(mut self, key: impl Into>) -> Self { self.keys.push(key.into()); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 8c69faf2d2e1..827c8e866271 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -27,6 +27,7 @@ use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::alter_table::AlterTableProcedure; +use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; use common_meta::ddl::drop_table::DropTableProcedure; use common_meta::key::table_info::TableInfoValue; @@ -40,11 +41,11 @@ use store_api::storage::RegionId; use crate::procedure::utils::mock::EchoRegionServer; use crate::procedure::utils::test_data; -fn create_table_task() -> CreateTableTask { +fn create_table_task(table_name: Option<&str>) -> CreateTableTask { let create_table_expr = CreateTableExpr { catalog_name: "my_catalog".to_string(), schema_name: "my_schema".to_string(), - table_name: "my_table".to_string(), + table_name: table_name.unwrap_or("my_table").to_string(), desc: "blabla".to_string(), column_defs: vec![ PbColumnDef { @@ -99,7 +100,7 @@ fn create_table_task() -> CreateTableTask { fn test_region_request_builder() { let procedure = CreateTableProcedure::new( 1, - create_table_task(), + create_table_task(None), TableRouteValue::physical(test_data::new_region_routes()), HashMap::default(), test_data::new_ddl_context(Arc::new(DatanodeClients::default())), @@ -190,7 +191,7 @@ async fn test_on_datanode_create_regions() { let mut procedure = CreateTableProcedure::new( 1, - create_table_task(), + create_table_task(None), TableRouteValue::physical(region_routes), HashMap::default(), test_data::new_ddl_context(datanode_manager), @@ -230,6 +231,74 @@ async fn test_on_datanode_create_regions() { assert!(expected_created_regions.lock().unwrap().is_empty()); } +#[tokio::test] +async fn test_on_datanode_create_logical_regions() { + let (region_server, mut rx) = EchoRegionServer::new(); + let region_routes = test_data::new_region_routes(); + let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; + let physical_table_route = TableRouteValue::physical(region_routes); + let physical_table_id = 111; + + let task1 = create_table_task(Some("my_table1")); + let task2 = create_table_task(Some("my_table2")); + let task3 = create_table_task(Some("my_table3")); + + let ctx = test_data::new_ddl_context(datanode_manager); + let kv_backend = ctx.table_metadata_manager.kv_backend(); + let physical_route_txn = ctx + .table_metadata_manager + .table_route_manager() + .build_create_txn(physical_table_id, &physical_table_route) + .unwrap() + .0; + let _ = kv_backend.txn(physical_route_txn).await.unwrap(); + let mut procedure = + CreateLogicalTablesProcedure::new(1, vec![task1, task2, task3], physical_table_id, ctx); + + let expected_created_regions = Arc::new(Mutex::new(HashMap::from([(1, 3), (2, 3), (3, 3)]))); + + let handle = tokio::spawn({ + let expected_created_regions = expected_created_regions.clone(); + let mut max_recv = expected_created_regions.lock().unwrap().len() * 3; + async move { + while let Some(PbRegionRequest::Creates(requests)) = rx.recv().await { + for request in requests.requests { + let region_number = RegionId::from_u64(request.region_id).region_number(); + + let mut map = expected_created_regions.lock().unwrap(); + let v = map.get_mut(®ion_number).unwrap(); + *v -= 1; + if *v == 0 { + map.remove(®ion_number); + } + + max_recv -= 1; + if max_recv == 0 { + break; + } + } + if max_recv == 0 { + break; + } + } + } + }); + + let status = procedure.on_datanode_create_regions().await.unwrap(); + assert!(matches!(status, Status::Executing { persist: false })); + assert!(matches!( + procedure.creator.data.state(), + &CreateTablesState::CreateMetadata + )); + + handle.await.unwrap(); + + assert!(expected_created_regions.lock().unwrap().is_empty()); + + let status = procedure.on_create_metadata().await.unwrap(); + assert!(status.is_done()); +} + #[tokio::test] async fn test_on_datanode_drop_regions() { let drop_table_task = DropTableTask { diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index e9b8501b5631..0a3eb5c71d90 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -492,6 +492,27 @@ pub enum Error { table_name: String, location: Location, }, + + #[snafu(display( + "Do not support creating tables in multiple catalogs: {}", + catalog_names + ))] + CreateTableWithMultiCatalogs { + catalog_names: String, + location: Location, + }, + + #[snafu(display("Do not support creating tables in multiple schemas: {}", schema_names))] + CreateTableWithMultiSchemas { + schema_names: String, + location: Location, + }, + + #[snafu(display("Empty creating table expr"))] + EmptyCreateTableExpr { location: Location }, + + #[snafu(display("Failed to create logical tables: {}", reason))] + CreateLogicalTables { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -608,6 +629,12 @@ impl ErrorExt for Error { } Error::ColumnDefaultValue { source, .. } => source.status_code(), + + Error::CreateTableWithMultiCatalogs { .. } + | Error::CreateTableWithMultiSchemas { .. } + | Error::EmptyCreateTableExpr { .. } => StatusCode::InvalidArguments, + + Error::CreateLogicalTables { .. } => StatusCode::Unexpected, } } diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 52d0d39d4d50..4069332b2c6a 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -21,6 +21,11 @@ lazy_static! { "table operator create table" ) .unwrap(); + pub static ref DIST_CREATE_TABLES: Histogram = register_histogram!( + "greptime_table_operator_create_tables", + "table operator create table" + ) + .unwrap(); pub static ref DIST_INGEST_ROW_COUNT: IntCounter = register_int_counter!( "greptime_table_operator_ingest_rows", "table operator ingest rows" diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index acca55af7d21..0827d44dbd9d 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; @@ -50,8 +50,10 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu, - Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu, + CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu, + InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu, Result, + SchemaNotFoundSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; @@ -113,12 +115,12 @@ impl StatementExecutor { &create_table.table_name, ) .await - .context(error::CatalogSnafu)? + .context(CatalogSnafu)? { return if create_table.create_if_not_exists { Ok(table) } else { - error::TableAlreadyExistsSnafu { + TableAlreadyExistsSnafu { table: format_full_table_name( &create_table.catalog_name, &create_table.schema_name, @@ -149,7 +151,7 @@ impl StatementExecutor { let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?; let resp = self - .create_table_procedure(create_table, partitions, table_info.clone()) + .create_table_procedure(create_table.clone(), partitions, table_info.clone()) .await?; let table_id = resp.table_id.context(error::UnexpectedSnafu { @@ -167,6 +169,99 @@ impl StatementExecutor { Ok(table) } + #[tracing::instrument(skip_all)] + pub async fn create_logical_tables( + &self, + create_table_exprs: &[CreateTableExpr], + ) -> Result> { + let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer(); + ensure!(!create_table_exprs.is_empty(), EmptyCreateTableExprSnafu); + ensure!( + create_table_exprs + .windows(2) + .all(|expr| expr[0].catalog_name == expr[1].catalog_name), + CreateTableWithMultiCatalogsSnafu { + catalog_names: create_table_exprs + .iter() + .map(|x| x.catalog_name.as_str()) + .collect::>() + .into_iter() + .collect::>() + .join(",") + .to_string() + } + ); + let catalog_name = create_table_exprs[0].catalog_name.to_string(); + + ensure!( + create_table_exprs + .windows(2) + .all(|expr| expr[0].schema_name == expr[1].schema_name), + CreateTableWithMultiSchemasSnafu { + schema_names: create_table_exprs + .iter() + .map(|x| x.schema_name.as_str()) + .collect::>() + .into_iter() + .collect::>() + .join(",") + .to_string() + } + ); + let schema_name = create_table_exprs[0].schema_name.to_string(); + + // Check table names + for create_table in create_table_exprs { + ensure!( + NAME_PATTERN_REG.is_match(&create_table.table_name), + InvalidTableNameSnafu { + table_name: create_table.table_name.clone(), + } + ); + } + + let schema = self + .table_metadata_manager + .schema_manager() + .get(SchemaNameKey::new(&catalog_name, &schema_name)) + .await + .context(TableMetadataManagerSnafu)? + .context(SchemaNotFoundSnafu { + schema_info: &schema_name, + })?; + + let mut raw_tables_info = create_table_exprs + .iter() + .map(|create| create_table_info(create, vec![], schema.clone())) + .collect::>>()?; + let tables_data = create_table_exprs + .iter() + .cloned() + .zip(raw_tables_info.iter().cloned()) + .collect::>(); + + let resp = self.create_logical_tables_procedure(tables_data).await?; + + let table_ids = resp.table_ids; + ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu { + reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len()) + }); + info!("Successfully created logical tables: {:?}", table_ids); + + for (i, table_info) in raw_tables_info.iter_mut().enumerate() { + table_info.ident.table_id = table_ids[i]; + } + let tables_info = raw_tables_info + .into_iter() + .map(|x| x.try_into().context(CreateTableInfoSnafu)) + .collect::>>()?; + + Ok(tables_info + .into_iter() + .map(|x| DistTable::table(Arc::new(x))) + .collect()) + } + #[tracing::instrument(skip_all)] pub async fn drop_table(&self, table_name: TableName, drop_if_exists: bool) -> Result { if let Some(table) = self @@ -331,14 +426,28 @@ impl StatementExecutor { async fn create_table_procedure( &self, - create_table: &CreateTableExpr, + create_table: CreateTableExpr, partitions: Vec, table_info: RawTableInfo, ) -> Result { let partitions = partitions.into_iter().map(Into::into).collect(); let request = SubmitDdlTaskRequest { - task: DdlTask::new_create_table(create_table.clone(), partitions, table_info), + task: DdlTask::new_create_table(create_table, partitions, table_info), + }; + + self.ddl_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + + async fn create_logical_tables_procedure( + &self, + tables_data: Vec<(CreateTableExpr, RawTableInfo)>, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_logical_tables(tables_data), }; self.ddl_executor diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index f3bfc4def661..d41b885f9465 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -16,7 +16,11 @@ use std::collections::HashMap; use std::fmt::{self}; use api::v1::add_column_location::LocationType; -use api::v1::region::{alter_request, region_request, AlterRequest}; +use api::v1::region::{ + alter_request, region_request, AlterRequest, AlterRequests, CloseRequest, CompactRequest, + CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, + InsertRequests, OpenRequest, TruncateRequest, +}; use api::v1::{self, Rows, SemanticType}; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; @@ -69,82 +73,19 @@ impl RegionRequest { /// Inserts/Deletes request might become multiple requests. Others are one-to-one. pub fn try_from_request_body(body: region_request::Body) -> Result> { match body { - region_request::Body::Inserts(inserts) => Ok(inserts - .requests - .into_iter() - .filter_map(|r| { - let region_id = r.region_id.into(); - r.rows - .map(|rows| (region_id, Self::Put(RegionPutRequest { rows }))) - }) - .collect()), - region_request::Body::Deletes(deletes) => Ok(deletes - .requests - .into_iter() - .filter_map(|r| { - let region_id = r.region_id.into(); - r.rows - .map(|rows| (region_id, Self::Delete(RegionDeleteRequest { rows }))) - }) - .collect()), - region_request::Body::Create(create) => { - let column_metadatas = create - .column_defs - .into_iter() - .map(ColumnMetadata::try_from_column_def) - .collect::>>()?; - let region_id = create.region_id.into(); - let region_dir = region_dir(&create.path, region_id); - Ok(vec![( - region_id, - Self::Create(RegionCreateRequest { - engine: create.engine, - column_metadatas, - primary_key: create.primary_key, - options: create.options, - region_dir, - }), - )]) - } - region_request::Body::Drop(drop) => Ok(vec![( - drop.region_id.into(), - Self::Drop(RegionDropRequest {}), - )]), - region_request::Body::Open(open) => { - let region_id = open.region_id.into(); - let region_dir = region_dir(&open.path, region_id); - Ok(vec![( - region_id, - Self::Open(RegionOpenRequest { - engine: open.engine, - region_dir, - options: open.options, - skip_wal_replay: false, - }), - )]) - } - region_request::Body::Close(close) => Ok(vec![( - close.region_id.into(), - Self::Close(RegionCloseRequest {}), - )]), - region_request::Body::Alter(alter) => Ok(vec![( - alter.region_id.into(), - Self::Alter(RegionAlterRequest::try_from(alter)?), - )]), - region_request::Body::Flush(flush) => Ok(vec![( - flush.region_id.into(), - Self::Flush(RegionFlushRequest { - row_group_size: None, - }), - )]), - region_request::Body::Compact(compact) => Ok(vec![( - compact.region_id.into(), - Self::Compact(RegionCompactRequest {}), - )]), - region_request::Body::Truncate(truncate) => Ok(vec![( - truncate.region_id.into(), - Self::Truncate(RegionTruncateRequest {}), - )]), + region_request::Body::Inserts(inserts) => make_region_puts(inserts), + region_request::Body::Deletes(deletes) => make_region_deletes(deletes), + region_request::Body::Create(create) => make_region_create(create), + region_request::Body::Drop(drop) => make_region_drop(drop), + region_request::Body::Open(open) => make_region_open(open), + region_request::Body::Close(close) => make_region_close(close), + region_request::Body::Alter(alter) => make_region_alter(alter), + region_request::Body::Flush(flush) => make_region_flush(flush), + region_request::Body::Compact(compact) => make_region_compact(compact), + region_request::Body::Truncate(truncate) => make_region_truncate(truncate), + region_request::Body::Creates(creates) => make_region_creates(creates), + region_request::Body::Drops(drops) => make_region_drops(drops), + region_request::Body::Alters(alters) => make_region_alters(alters), } } @@ -154,6 +95,141 @@ impl RegionRequest { } } +fn make_region_puts(inserts: InsertRequests) -> Result> { + let requests = inserts + .requests + .into_iter() + .filter_map(|r| { + let region_id = r.region_id.into(); + r.rows + .map(|rows| (region_id, RegionRequest::Put(RegionPutRequest { rows }))) + }) + .collect(); + Ok(requests) +} + +fn make_region_deletes(deletes: DeleteRequests) -> Result> { + let requests = deletes + .requests + .into_iter() + .filter_map(|r| { + let region_id = r.region_id.into(); + r.rows.map(|rows| { + ( + region_id, + RegionRequest::Delete(RegionDeleteRequest { rows }), + ) + }) + }) + .collect(); + Ok(requests) +} + +fn make_region_create(create: CreateRequest) -> Result> { + let column_metadatas = create + .column_defs + .into_iter() + .map(ColumnMetadata::try_from_column_def) + .collect::>>()?; + let region_id = create.region_id.into(); + let region_dir = region_dir(&create.path, region_id); + Ok(vec![( + region_id, + RegionRequest::Create(RegionCreateRequest { + engine: create.engine, + column_metadatas, + primary_key: create.primary_key, + options: create.options, + region_dir, + }), + )]) +} + +fn make_region_creates(creates: CreateRequests) -> Result> { + let mut requests = Vec::with_capacity(creates.requests.len()); + for create in creates.requests { + requests.extend(make_region_create(create)?); + } + Ok(requests) +} + +fn make_region_drop(drop: DropRequest) -> Result> { + let region_id = drop.region_id.into(); + Ok(vec![(region_id, RegionRequest::Drop(RegionDropRequest {}))]) +} + +fn make_region_drops(drops: DropRequests) -> Result> { + let mut requests = Vec::with_capacity(drops.requests.len()); + for drop in drops.requests { + requests.extend(make_region_drop(drop)?); + } + Ok(requests) +} + +fn make_region_open(open: OpenRequest) -> Result> { + let region_id = open.region_id.into(); + let region_dir = region_dir(&open.path, region_id); + Ok(vec![( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: open.engine, + region_dir, + options: open.options, + skip_wal_replay: false, + }), + )]) +} + +fn make_region_close(close: CloseRequest) -> Result> { + let region_id = close.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::Close(RegionCloseRequest {}), + )]) +} + +fn make_region_alter(alter: AlterRequest) -> Result> { + let region_id = alter.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::Alter(RegionAlterRequest::try_from(alter)?), + )]) +} + +fn make_region_alters(alters: AlterRequests) -> Result> { + let mut requests = Vec::with_capacity(alters.requests.len()); + for alter in alters.requests { + requests.extend(make_region_alter(alter)?); + } + Ok(requests) +} + +fn make_region_flush(flush: FlushRequest) -> Result> { + let region_id = flush.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + )]) +} + +fn make_region_compact(compact: CompactRequest) -> Result> { + let region_id = compact.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::Compact(RegionCompactRequest {}), + )]) +} + +fn make_region_truncate(truncate: TruncateRequest) -> Result> { + let region_id = truncate.region_id.into(); + Ok(vec![( + region_id, + RegionRequest::Truncate(RegionTruncateRequest {}), + )]) +} + /// Request to put data into a region. #[derive(Debug)] pub struct RegionPutRequest {