diff --git a/Cargo.lock b/Cargo.lock index 24d33a60e098..36bca2a89a2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3781,7 +3781,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=06f6297ff3cab578a1589741b504342fbad70453#06f6297ff3cab578a1589741b504342fbad70453" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1bd2398b686e5ac6c1eef6daf615867ce27f75c1#1bd2398b686e5ac6c1eef6daf615867ce27f75c1" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 290a459bfa2f..70899255cd67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1bd2398b686e5ac6c1eef6daf615867ce27f75c1" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 4b9a89603cee..c763d3390828 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -30,6 +30,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; +pub mod create_database; pub mod create_logical_tables; pub mod create_table; mod create_table_template; diff --git a/src/common/meta/src/ddl/create_database.rs b/src/common/meta/src/ddl/create_database.rs new file mode 100644 index 000000000000..513a16ea76bf --- /dev/null +++ b/src/common/meta/src/ddl/create_database.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use strum::AsRefStr; + +use crate::ddl::utils::handle_retry_error; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::key::schema_name::{SchemaNameKey, SchemaNameValue}; +use crate::lock_key::{CatalogLock, SchemaLock}; + +pub struct CreateDatabaseProcedure { + pub context: DdlContext, + pub data: CreateDatabaseData, +} + +impl CreateDatabaseProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase"; + + pub fn new( + catalog: String, + schema: String, + create_if_not_exists: bool, + options: Option>, + context: DdlContext, + ) -> Self { + Self { + context, + data: CreateDatabaseData { + state: CreateDatabaseState::Prepare, + catalog, + schema, + create_if_not_exists, + options, + }, + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(Self { context, data }) + } + + pub async fn on_prepare(&mut self) -> Result { + let exists = self + .context + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey::new(&self.data.catalog, &self.data.schema)) + .await?; + + if exists && self.data.create_if_not_exists { + return Ok(Status::done()); + } + + ensure!( + !exists, + error::SchemaAlreadyExistsSnafu { + catalog: &self.data.catalog, + schema: &self.data.schema, + } + ); + + self.data.state = CreateDatabaseState::CreateMetadata; + Ok(Status::executing(true)) + } + + pub async fn on_create_metadata(&mut self) -> Result { + let value: Option = self + .data + .options + .as_ref() + .map(|hash_map_ref| hash_map_ref.try_into()) + .transpose()?; + + self.context + .table_metadata_manager + .schema_manager() + .create( + SchemaNameKey::new(&self.data.catalog, &self.data.schema), + value, + self.data.create_if_not_exists, + ) + .await?; + + Ok(Status::done()) + } +} + +#[async_trait] +impl Procedure for CreateDatabaseProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + match state { + CreateDatabaseState::Prepare => self.on_prepare().await, + CreateDatabaseState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let lock_key = vec![ + CatalogLock::Read(&self.data.catalog).into(), + SchemaLock::write(&self.data.catalog, &self.data.schema).into(), + ]; + + LockKey::new(lock_key) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] +pub enum CreateDatabaseState { + Prepare, + CreateMetadata, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateDatabaseData { + pub state: CreateDatabaseState, + pub catalog: String, + pub schema: String, + pub create_if_not_exists: bool, + pub options: Option>, +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 72beec194f09..82411ad79310 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,6 +26,7 @@ use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; +use crate::ddl::create_database::CreateDatabaseProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; @@ -45,12 +46,12 @@ use crate::key::table_route::TableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ - AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropDatabase, + AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateTableTask, DropDatabaseTask, DropTableTask, SubmitDdlTaskRequest, - SubmitDdlTaskResponse, TruncateTableTask, + AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask, + SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -170,6 +171,15 @@ impl DdlManager { }) }, ), + ( + CreateDatabaseProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + CreateDatabaseProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), ( DropDatabaseProcedure::TYPE_NAME, &|context: DdlContext| -> BoxedProcedureLoader { @@ -293,6 +303,26 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a create database task. + pub async fn submit_create_database( + &self, + _cluster_id: ClusterId, + CreateDatabaseTask { + catalog, + schema, + create_if_not_exists, + options, + }: CreateDatabaseTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = + CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a drop table task. pub async fn submit_drop_database( @@ -557,6 +587,27 @@ async fn handle_create_logical_table_tasks( }) } +async fn handle_create_database_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + create_database_task: CreateDatabaseTask, +) -> Result { + let (id, _) = ddl_manager + .submit_create_database(cluster_id, create_database_task.clone()) + .await?; + + let procedure_id = id.to_string(); + info!( + "Database {}.{} is created via procedure_id {id:?}", + create_database_task.catalog, create_database_task.schema + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + async fn handle_drop_database_task( ddl_manager: &DdlManager, cluster_id: ClusterId, @@ -651,6 +702,9 @@ impl ProcedureExecutor for DdlManager { handle_alter_logical_table_tasks(self, cluster_id, alter_table_tasks).await } DropLogicalTables(_) => todo!(), + CreateDatabase(create_database_task) => { + handle_create_database_task(self, cluster_id, create_database_task).await + } DropDatabase(drop_database_task) => { handle_drop_database_task(self, cluster_id, drop_database_task).await } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index bba6ee63d36b..9256dd7909a5 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -12,18 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::result; use api::v1::meta::ddl_task_request::Task; use api::v1::meta::{ AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, - CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, - DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, - DropDatabaseTask as PbDropDatabaseTask, DropTableTask as PbDropTableTask, - DropTableTasks as PbDropTableTasks, Partition, ProcedureId, + CreateDatabaseTask as PbCreateDatabaseTask, CreateTableTask as PbCreateTableTask, + CreateTableTasks as PbCreateTableTasks, DdlTaskRequest as PbDdlTaskRequest, + DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask, + DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; -use api::v1::{AlterExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, TruncateTableExpr}; +use api::v1::{ + AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, + TruncateTableExpr, +}; use base64::engine::general_purpose; use base64::Engine as _; use prost::Message; @@ -44,6 +48,7 @@ pub enum DdlTask { CreateLogicalTables(Vec), DropLogicalTables(Vec), AlterLogicalTables(Vec), + CreateDatabase(CreateDatabaseTask), DropDatabase(DropDatabaseTask), } @@ -90,6 +95,20 @@ impl DdlTask { }) } + pub fn new_create_database( + catalog: String, + schema: String, + create_if_not_exists: bool, + options: Option>, + ) -> Self { + DdlTask::CreateDatabase(CreateDatabaseTask { + catalog, + schema, + create_if_not_exists, + options, + }) + } + pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self { DdlTask::DropDatabase(DropDatabaseTask { catalog, @@ -156,6 +175,9 @@ impl TryFrom for DdlTask { Ok(DdlTask::AlterLogicalTables(tasks)) } + Task::CreateDatabaseTask(create_database) => { + Ok(DdlTask::CreateDatabase(create_database.try_into()?)) + } Task::DropDatabaseTask(drop_database) => { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } @@ -201,6 +223,7 @@ impl TryFrom for PbDdlTaskRequest { Task::AlterTableTasks(PbAlterTableTasks { tasks }) } + DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?), DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), }; @@ -588,6 +611,58 @@ impl TryFrom for PbTruncateTableTask { } } +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct CreateDatabaseTask { + pub catalog: String, + pub schema: String, + pub create_if_not_exists: bool, + pub options: Option>, +} + +impl TryFrom for CreateDatabaseTask { + type Error = error::Error; + + fn try_from(pb: PbCreateDatabaseTask) -> Result { + let CreateDatabaseExpr { + catalog_name, + database_name, + create_if_not_exists, + options, + } = pb.create_database.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create database", + })?; + + Ok(CreateDatabaseTask { + catalog: catalog_name, + schema: database_name, + create_if_not_exists, + options: Some(options), + }) + } +} + +impl TryFrom for PbCreateDatabaseTask { + type Error = error::Error; + + fn try_from( + CreateDatabaseTask { + catalog, + schema, + create_if_not_exists, + options, + }: CreateDatabaseTask, + ) -> Result { + Ok(PbCreateDatabaseTask { + create_database: Some(CreateDatabaseExpr { + catalog_name: catalog, + database_name: schema, + create_if_not_exists, + options: options.unwrap_or_default(), + }), + }) + } +} + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct DropDatabaseTask { pub catalog: String, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 5db1d3b4012c..2ab87fa4513e 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -691,30 +691,41 @@ impl StatementExecutor { } ); - // TODO(weny): considers executing it in the procedures. - let schema_key = SchemaNameKey::new(catalog, database); - let exists = self - .table_metadata_manager - .schema_manager() - .exists(schema_key) + if !self + .catalog_manager + .schema_exists(catalog, database) .await - .context(TableMetadataManagerSnafu)?; + .context(CatalogSnafu)? + { + self.create_database_procedure( + catalog.to_string(), + database.to_string(), + create_if_not_exists, + ) + .await?; - if exists { - return if create_if_not_exists { - Ok(Output::new_with_affected_rows(1)) - } else { - error::SchemaExistsSnafu { name: database }.fail() - }; + Ok(Output::new_with_affected_rows(1)) + } else if create_if_not_exists { + Ok(Output::new_with_affected_rows(1)) + } else { + error::SchemaExistsSnafu { name: database }.fail() } + } - self.table_metadata_manager - .schema_manager() - .create(schema_key, None, false) - .await - .context(TableMetadataManagerSnafu)?; + async fn create_database_procedure( + &self, + catalog: String, + database: String, + create_if_not_exists: bool, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_database(catalog, database, create_if_not_exists, None), + }; - Ok(Output::new_with_affected_rows(1)) + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 2d2a6294a9df..1aee8c9d37c8 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -73,6 +73,7 @@ mod test { async fn test_handle_ddl_request(instance: &Instance) { let request = Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + catalog_name: "greptime".to_string(), database_name: "database_created_through_grpc".to_string(), create_if_not_exists: true, options: Default::default(),