From bf14d339628a7bf43cf5bd8808f30fedf592f864 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 25 Mar 2024 15:12:47 +0900 Subject: [PATCH] feat: implement the drop database procedure (#3541) * refactor: remove Sync trait of Procedure * refactor: remove unnecessary async * feat: implement the drop database procedure * refactor: refactor DdlManager register_loaders * feat: register the DropDatabaseProcedureLoader * chore: fmt toml * feat: support to submit DropDatabaseTask * feat: support drop database stmt * fix: empty the tables stream * fix: ensure the factory always exists * test: update sqlness results * chore: correct comments * test: update sqlness results * test: update sqlness results * chore: apply suggestions from CR * chore: apply suggestions from CR --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/catalog/src/kvbackend/manager.rs | 10 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl.rs | 1 + src/common/meta/src/ddl/drop_database.rs | 171 ++++++++++++++++ .../meta/src/ddl/drop_database/cursor.rs | 141 +++++++++++++ src/common/meta/src/ddl/drop_database/end.rs | 35 ++++ .../meta/src/ddl/drop_database/executor.rs | 109 +++++++++++ .../meta/src/ddl/drop_database/metadata.rs | 43 ++++ .../meta/src/ddl/drop_database/start.rs | 65 ++++++ src/common/meta/src/ddl_manager.rs | 185 +++++++++++------- src/common/meta/src/error.rs | 13 +- src/common/meta/src/key.rs | 4 + src/common/meta/src/key/catalog_name.rs | 2 +- src/common/meta/src/key/schema_name.rs | 10 +- src/common/meta/src/key/table_name.rs | 2 +- src/common/meta/src/range_stream.rs | 2 + src/common/meta/src/rpc/ddl.rs | 69 ++++++- src/common/procedure/src/lib.rs | 6 +- src/common/procedure/src/local.rs | 8 +- src/common/procedure/src/local/runner.rs | 10 +- src/common/procedure/src/procedure.rs | 2 +- src/common/procedure/src/store.rs | 68 ++++--- src/meta-srv/src/service/admin/meta.rs | 9 +- src/operator/src/statement.rs | 13 +- src/operator/src/statement/ddl.rs | 44 +++++ .../standalone/common/catalog/schema.result | 2 +- .../common/create/create_database.result | 1 - .../common/show/show_databases_tables.result | 1 - .../common/system/information_schema.result | 21 +- 31 files changed, 903 insertions(+), 150 deletions(-) create mode 100644 src/common/meta/src/ddl/drop_database.rs create mode 100644 src/common/meta/src/ddl/drop_database/cursor.rs create mode 100644 src/common/meta/src/ddl/drop_database/end.rs create mode 100644 src/common/meta/src/ddl/drop_database/executor.rs create mode 100644 src/common/meta/src/ddl/drop_database/metadata.rs create mode 100644 src/common/meta/src/ddl/drop_database/start.rs diff --git a/Cargo.lock b/Cargo.lock index 9a1223486247..5d491ee3d11a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1993,6 +1993,7 @@ dependencies = [ "table", "tokio", "tonic 0.10.2", + "typetag", "uuid", ] @@ -3870,7 +3871,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=349cb385583697f41010dabeb3c106d58f9599b4#349cb385583697f41010dabeb3c106d58f9599b4" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=06f6297ff3cab578a1589741b504342fbad70453#06f6297ff3cab578a1589741b504342fbad70453" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 509d7f84c8a2..350880e1bcc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "349cb385583697f41010dabeb3c106d58f9599b4" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 6a6038f1daea..327a08180975 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -138,8 +138,7 @@ impl CatalogManager for KvBackendCatalogManager { let stream = self .table_metadata_manager .catalog_manager() - .catalog_names() - .await; + .catalog_names(); let keys = stream .try_collect::>() @@ -154,8 +153,7 @@ impl CatalogManager for KvBackendCatalogManager { let stream = self .table_metadata_manager .schema_manager() - .schema_names(catalog) - .await; + .schema_names(catalog); let mut keys = stream .try_collect::>() .await @@ -171,8 +169,7 @@ impl CatalogManager for KvBackendCatalogManager { let stream = self .table_metadata_manager .table_name_manager() - .tables(catalog, schema) - .await; + .tables(catalog, schema); let mut tables = stream .try_collect::>() .await @@ -297,7 +294,6 @@ impl CatalogManager for KvBackendCatalogManager { .table_metadata_manager .table_name_manager() .tables(catalog, schema) - .await .map_ok(|(_, v)| v.table_id()); const BATCH_SIZE: usize = 128; let user_tables = try_stream!({ diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index fdab5eae0371..8892d602eada 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -53,6 +53,7 @@ strum.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true +typetag = "0.2" [dev-dependencies] chrono.workspace = true diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index d5d790f95838..958d2fe1877f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -32,6 +32,7 @@ pub mod alter_table; pub mod create_logical_tables; pub mod create_table; mod create_table_template; +pub mod drop_database; pub mod drop_table; pub mod table_meta; #[cfg(any(test, feature = "testing"))] diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs new file mode 100644 index 000000000000..6454a403e327 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod cursor; +pub mod end; +pub mod executor; +pub mod metadata; +pub mod start; +use std::fmt::Debug; + +use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use futures::stream::BoxStream; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use tonic::async_trait; + +use self::start::DropDatabaseStart; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::table_name::TableNameValue; +use crate::lock_key::{CatalogLock, SchemaLock}; + +pub struct DropDatabaseProcedure { + /// The context of procedure runtime. + runtime_context: DdlContext, + context: DropDatabaseContext, + + state: Box, +} + +/// Target of dropping tables. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum DropTableTarget { + Logical, + Physical, +} + +/// Context of [DropDatabaseProcedure] execution. +pub struct DropDatabaseContext { + catalog: String, + schema: String, + drop_if_exists: bool, + tables: Option>>, +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "drop_database_state")] +pub(crate) trait State: Send + Debug { + /// Yields the next [State] and [Status]. + async fn next( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)>; +} + +impl DropDatabaseProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase"; + + pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self { + Self { + runtime_context: context, + context: DropDatabaseContext { + catalog, + schema, + drop_if_exists, + tables: None, + }, + state: Box::new(DropDatabaseStart), + } + } + + pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult { + let DropDatabaseOwnedData { + catalog, + schema, + drop_if_exists, + state, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(Self { + runtime_context, + context: DropDatabaseContext { + catalog, + schema, + drop_if_exists, + tables: None, + }, + state, + }) + } +} + +#[async_trait] +impl Procedure for DropDatabaseProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + + let (next, status) = state + .next(&self.runtime_context, &mut self.context) + .await + .map_err(|e| { + if e.is_retry_later() { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + })?; + + *state = next; + Ok(status) + } + + fn dump(&self) -> ProcedureResult { + let data = DropDatabaseData { + catalog: &self.context.catalog, + schema: &self.context.schema, + drop_if_exists: self.context.drop_if_exists, + state: self.state.as_ref(), + }; + + serde_json::to_string(&data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let lock_key = vec![ + CatalogLock::Read(&self.context.catalog).into(), + SchemaLock::write(&self.context.catalog, &self.context.schema).into(), + ]; + + LockKey::new(lock_key) + } +} + +#[derive(Debug, Serialize)] +struct DropDatabaseData<'a> { + // The catalog name + catalog: &'a str, + // The schema name + schema: &'a str, + drop_if_exists: bool, + state: &'a dyn State, +} + +#[derive(Debug, Deserialize)] +struct DropDatabaseOwnedData { + // The catalog name + catalog: String, + // The schema name + schema: String, + drop_if_exists: bool, + state: Box, +} diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs new file mode 100644 index 000000000000..afc5b152afc0 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -0,0 +1,141 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use super::executor::DropDatabaseExecutor; +use super::metadata::DropDatabaseRemoveMetadata; +use super::DropTableTarget; +use crate::ddl::drop_database::{DropDatabaseContext, State}; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; +use crate::table_name::TableName; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropDatabaseCursor { + target: DropTableTarget, +} + +impl DropDatabaseCursor { + /// Returns a new [DropDatabaseCursor]. + pub fn new(target: DropTableTarget) -> Self { + Self { target } + } + + fn handle_reach_end( + &mut self, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + match self.target { + DropTableTarget::Logical => { + // Consumes the tables stream. + ctx.tables.take(); + + Ok(( + Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)), + Status::executing(true), + )) + } + DropTableTarget::Physical => Ok(( + Box::new(DropDatabaseRemoveMetadata), + Status::executing(true), + )), + } + } + + async fn handle_table( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + table_name: String, + table_id: TableId, + table_route_value: DeserializedValueWithBytes, + ) -> Result<(Box, Status)> { + match (self.target, table_route_value.get_inner_ref()) { + (DropTableTarget::Logical, TableRouteValue::Logical(_)) + | (DropTableTarget::Physical, TableRouteValue::Physical(_)) => { + // TODO(weny): Maybe we can drop the table without fetching the `TableInfoValue` + let table_info_value = ddl_ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await? + .context(error::TableNotFoundSnafu { + table_name: &table_name, + })?; + Ok(( + Box::new(DropDatabaseExecutor::new( + TableName::new(&ctx.catalog, &ctx.schema, &table_name), + table_id, + table_info_value, + table_route_value, + self.target, + )), + Status::executing(true), + )) + } + _ => Ok(( + Box::new(DropDatabaseCursor::new(self.target)), + Status::executing(false), + )), + } + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropDatabaseCursor { + async fn next( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + if ctx.tables.as_deref().is_none() { + let tables = ddl_ctx + .table_metadata_manager + .table_name_manager() + .tables(&ctx.catalog, &ctx.schema); + ctx.tables = Some(tables); + } + // Safety: must exist + match ctx.tables.as_mut().unwrap().try_next().await? { + Some((table_name, table_name_value)) => { + let table_id = table_name_value.table_id(); + match ddl_ctx + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_raw(table_id) + .await? + { + Some(table_route_value) => { + self.handle_table(ddl_ctx, ctx, table_name, table_id, table_route_value) + .await + } + None => Ok(( + Box::new(DropDatabaseCursor::new(self.target)), + Status::executing(false), + )), + } + } + None => self.handle_reach_end(ctx), + } + } +} diff --git a/src/common/meta/src/ddl/drop_database/end.rs b/src/common/meta/src/ddl/drop_database/end.rs new file mode 100644 index 000000000000..39e5c1a1add2 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database/end.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; + +use crate::ddl::drop_database::{DropDatabaseContext, State}; +use crate::ddl::DdlContext; +use crate::error::Result; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropDatabaseEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropDatabaseEnd { + async fn next( + &mut self, + _: &DdlContext, + _: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(DropDatabaseEnd), Status::done())) + } +} diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs new file mode 100644 index 000000000000..096493b9ce43 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -0,0 +1,109 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use super::cursor::DropDatabaseCursor; +use super::{DropDatabaseContext, DropTableTarget}; +use crate::ddl::drop_database::State; +use crate::ddl::drop_table::executor::DropTableExecutor; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; +use crate::region_keeper::OperatingRegionGuard; +use crate::rpc::router::operating_leader_regions; +use crate::table_name::TableName; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropDatabaseExecutor { + table_name: TableName, + table_id: TableId, + table_info_value: DeserializedValueWithBytes, + table_route_value: DeserializedValueWithBytes, + target: DropTableTarget, + #[serde(skip)] + dropping_regions: Vec, +} + +impl DropDatabaseExecutor { + /// Returns a new [DropDatabaseExecutor]. + pub fn new( + table_name: TableName, + table_id: TableId, + table_info_value: DeserializedValueWithBytes, + table_route_value: DeserializedValueWithBytes, + target: DropTableTarget, + ) -> Self { + Self { + table_name, + table_id, + table_info_value, + table_route_value, + target, + dropping_regions: vec![], + } + } +} + +impl DropDatabaseExecutor { + fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> { + let region_routes = self.table_route_value.region_routes()?; + let dropping_regions = operating_leader_regions(region_routes); + let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); + for (region_id, datanode_id) in dropping_regions { + let guard = ddl_ctx + .memory_region_keeper + .register(datanode_id, region_id) + .context(error::RegionOperatingRaceSnafu { + region_id, + peer_id: datanode_id, + })?; + dropping_region_guards.push(guard); + } + self.dropping_regions = dropping_region_guards; + Ok(()) + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropDatabaseExecutor { + async fn next( + &mut self, + ddl_ctx: &DdlContext, + _ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + self.register_dropping_regions(ddl_ctx)?; + let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true); + executor + .on_remove_metadata(ddl_ctx, &self.table_info_value, &self.table_route_value) + .await?; + executor.invalidate_table_cache(ddl_ctx).await?; + executor + .on_drop_regions(ddl_ctx, &self.table_route_value) + .await?; + info!("Table: {}({}) is dropped", self.table_name, self.table_id); + + Ok(( + Box::new(DropDatabaseCursor::new(self.target)), + Status::executing(false), + )) + } +} diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs new file mode 100644 index 000000000000..37129f16d566 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; + +use super::end::DropDatabaseEnd; +use crate::ddl::drop_database::{DropDatabaseContext, State}; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::schema_name::SchemaNameKey; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropDatabaseRemoveMetadata; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropDatabaseRemoveMetadata { + async fn next( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + ddl_ctx + .table_metadata_manager + .schema_manager() + .delete(SchemaNameKey::new(&ctx.catalog, &ctx.schema)) + .await?; + + return Ok((Box::new(DropDatabaseEnd), Status::done())); + } +} diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs new file mode 100644 index 000000000000..2c20517fb5f1 --- /dev/null +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::ddl::drop_database::cursor::DropDatabaseCursor; +use crate::ddl::drop_database::end::DropDatabaseEnd; +use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::key::schema_name::SchemaNameKey; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropDatabaseStart; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropDatabaseStart { + /// Checks whether schema exists. + /// - Early returns if schema not exists and `drop_if_exists` is `true`. + /// - Throws an error if schema not exists and `drop_if_exists` is `false`. + async fn next( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + let exists = ddl_ctx + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey { + catalog: &ctx.catalog, + schema: &ctx.schema, + }) + .await?; + + if !exists && ctx.drop_if_exists { + return Ok((Box::new(DropDatabaseEnd), Status::done())); + } + + ensure!( + exists, + error::SchemaNotFoundSnafu { + table_schema: &ctx.schema, + } + ); + + Ok(( + Box::new(DropDatabaseCursor::new(DropTableTarget::Logical)), + Status::executing(true), + )) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 5f712fbdd8a6..c7ae7a23b21f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,7 +14,9 @@ use std::sync::Arc; -use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{ + watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, +}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, info, tracing}; use snafu::{ensure, OptionExt, ResultExt}; @@ -25,6 +27,7 @@ use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; @@ -39,12 +42,12 @@ use crate::key::table_route::TableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ - AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropLogicalTables, DropTable, - TruncateTable, + AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropDatabase, + DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, - TruncateTableTask, + AlterTableTask, CreateTableTask, DropDatabaseTask, DropTableTask, SubmitDdlTaskRequest, + SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -54,6 +57,8 @@ use crate::ClusterId; pub type DdlManagerRef = Arc; +pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader; + /// The [DdlManager] provides the ability to execute Ddl. pub struct DdlManager { procedure_manager: ProcedureManagerRef, @@ -64,8 +69,8 @@ pub struct DdlManager { memory_region_keeper: MemoryRegionKeeperRef, } +/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. impl DdlManager { - /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. pub fn try_new( procedure_manager: ProcedureManagerRef, datanode_clients: DatanodeManagerRef, @@ -103,75 +108,72 @@ impl DdlManager { } fn register_loaders(&self) -> Result<()> { - let context = self.create_context(); - - self.procedure_manager - .register_loader( + let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = vec![ + ( CreateTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: CreateTableProcedure::TYPE_NAME, - })?; - - let context = self.create_context(); - - self.procedure_manager - .register_loader( + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), + ( CreateLogicalTablesProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - CreateLogicalTablesProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: CreateLogicalTablesProcedure::TYPE_NAME, - })?; - - let context = self.create_context(); - - self.procedure_manager - .register_loader( - DropTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: DropTableProcedure::TYPE_NAME, - })?; - - let context = self.create_context(); - - self.procedure_manager - .register_loader( + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + CreateLogicalTablesProcedure::from_json(json, context) + .map(|p| Box::new(p) as _) + }) + }, + ), + ( AlterTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: AlterTableProcedure::TYPE_NAME, - })?; + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), + ( + DropTableProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), + ( + TruncateTableProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + TruncateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), + ( + DropDatabaseProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + DropDatabaseProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), + ]; - let context = self.create_context(); + for (type_name, loader_factory) in loaders { + let context = self.create_context(); + self.procedure_manager + .register_loader(type_name, loader_factory(context)) + .context(RegisterProcedureLoaderSnafu { type_name })?; + } - self.procedure_manager - .register_loader( - TruncateTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - TruncateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: TruncateTableProcedure::TYPE_NAME, - }) + Ok(()) } #[tracing::instrument(skip_all)] @@ -260,6 +262,24 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a drop table task. + pub async fn submit_drop_database( + &self, + _cluster_id: ClusterId, + DropDatabaseTask { + catalog, + schema, + drop_if_exists, + }: DropDatabaseTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( @@ -529,6 +549,28 @@ async fn handle_create_logical_table_tasks( }) } +async fn handle_drop_database_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + drop_database_task: DropDatabaseTask, +) -> Result { + let (id, _) = ddl_manager + .submit_drop_database(cluster_id, drop_database_task.clone()) + .await?; + + let procedure_id = id.to_string(); + info!( + "Database {}.{} is dropped via procedure_id {id:?}", + drop_database_task.catalog, drop_database_task.schema + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + table_id: None, + ..Default::default() + }) +} + /// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it. #[async_trait::async_trait] impl ProcedureExecutor for DdlManager { @@ -564,6 +606,9 @@ impl ProcedureExecutor for DdlManager { } DropLogicalTables(_) => todo!(), AlterLogicalTables(_) => todo!(), + DropDatabase(drop_database_task) => { + handle_drop_database_task(self, cluster_id, drop_database_task).await + } } } .trace(span) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 56823fd2e9ab..1e92c30fe9f6 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -267,6 +267,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Schema nod found, schema: {}", table_schema))] + SchemaNotFound { + table_schema: String, + location: Location, + }, + #[snafu(display("Failed to rename table, reason: {}", reason))] RenameTable { reason: String, location: Location }, @@ -472,9 +478,10 @@ impl ErrorExt for Error { InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), - ParseProcedureId { .. } | InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => { - StatusCode::InvalidArguments - } + ParseProcedureId { .. } + | InvalidNumTopics { .. } + | EmptyCreateTableTasks { .. } + | SchemaNotFound { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index acc5e38c1a9e..225d552a0fd2 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -273,6 +273,10 @@ impl DeserializedValueWithByte self.inner } + pub fn get_inner_ref(&self) -> &T { + &self.inner + } + /// Returns original `bytes` pub fn get_raw_bytes(&self) -> Vec { self.bytes.to_vec() diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index fc393712eda2..4bbfb367b9f4 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -123,7 +123,7 @@ impl CatalogManager { self.kv_backend.exists(&raw_key).await } - pub async fn catalog_names(&self) -> BoxStream<'static, Result> { + pub fn catalog_names(&self) -> BoxStream<'static, Result> { let start_key = CatalogNameKey::range_start_key(); let req = RangeRequest::new().with_prefix(start_key.as_bytes()); diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 3364e1ade322..8b391ca9b3d7 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -173,8 +173,16 @@ impl SchemaManager { .transpose() } + /// Deletes a [SchemaNameKey]. + pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> { + let raw_key = schema.as_raw_key(); + self.kv_backend.delete(&raw_key, false).await?; + + Ok(()) + } + /// Returns a schema stream, it lists all schemas belong to the target `catalog`. - pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result> { + pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result> { let start_key = SchemaNameKey::range_start_key(catalog); let req = RangeRequest::new().with_prefix(start_key.as_bytes()); diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 971856f2b6c2..75b5b86cffd1 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -241,7 +241,7 @@ impl TableNameManager { self.kv_backend.exists(&raw_key).await } - pub async fn tables( + pub fn tables( &self, catalog: &str, schema: &str, diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index ce15c5b7ecad..878d5c0c9f24 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -236,6 +236,8 @@ impl Stream for PaginationStream { PaginationStreamState::Init => { let factory = self.factory.take().expect("lost factory"); if !factory.more { + // Ensures the factory always exists. + self.factory = Some(factory); return Poll::Ready(None); } let fut = factory.read_next().boxed(); diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index b97924aa9f25..f211d0b3d47f 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -19,10 +19,13 @@ use api::v1::meta::{ AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, - DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, + DropDatabaseTask as PbDropDatabaseTask, DropTableTask as PbDropTableTask, + DropTableTasks as PbDropTableTasks, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, SemanticType, TruncateTableExpr}; +use api::v1::{ + AlterExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, SemanticType, TruncateTableExpr, +}; use base64::engine::general_purpose; use base64::Engine as _; use prost::Message; @@ -43,6 +46,7 @@ pub enum DdlTask { CreateLogicalTables(Vec), DropLogicalTables(Vec), AlterLogicalTables(Vec), + DropDatabase(DropDatabaseTask), } impl DdlTask { @@ -79,6 +83,14 @@ impl DdlTask { }) } + pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self { + DdlTask::DropDatabase(DropDatabaseTask { + catalog, + schema, + drop_if_exists, + }) + } + pub fn new_alter_table(alter_table: AlterExpr) -> Self { DdlTask::AlterTable(AlterTableTask { alter_table }) } @@ -137,6 +149,9 @@ impl TryFrom for DdlTask { Ok(DdlTask::AlterLogicalTables(tasks)) } + Task::DropDatabaseTask(drop_database) => { + Ok(DdlTask::DropDatabase(drop_database.try_into()?)) + } } } } @@ -179,6 +194,7 @@ impl TryFrom for PbDdlTaskRequest { Task::AlterTableTasks(PbAlterTableTasks { tasks }) } + DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), }; Ok(Self { @@ -557,7 +573,7 @@ impl TryFrom for TruncateTableTask { fn try_from(pb: PbTruncateTableTask) -> Result { let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu { - err_msg: "expected drop table", + err_msg: "expected truncate table", })?; Ok(Self { @@ -589,6 +605,53 @@ impl TryFrom for PbTruncateTableTask { } } +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct DropDatabaseTask { + pub catalog: String, + pub schema: String, + pub drop_if_exists: bool, +} + +impl TryFrom for DropDatabaseTask { + type Error = error::Error; + + fn try_from(pb: PbDropDatabaseTask) -> Result { + let DropDatabaseExpr { + catalog_name, + schema_name, + drop_if_exists, + } = pb.drop_database.context(error::InvalidProtoMsgSnafu { + err_msg: "expected drop database", + })?; + + Ok(DropDatabaseTask { + catalog: catalog_name, + schema: schema_name, + drop_if_exists, + }) + } +} + +impl TryFrom for PbDropDatabaseTask { + type Error = error::Error; + + fn try_from( + DropDatabaseTask { + catalog, + schema, + drop_if_exists, + }: DropDatabaseTask, + ) -> Result { + Ok(PbDropDatabaseTask { + drop_database: Some(DropDatabaseExpr { + catalog_name: catalog, + schema_name: schema, + drop_if_exists, + }), + }) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index cef90d8dfe09..ece2ce4189a0 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -25,8 +25,8 @@ pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ - BoxedProcedure, Context, ContextProvider, LockKey, Output, ParseIdError, Procedure, - ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, - StringKey, + BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError, + Procedure, ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, + Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index c68005db590d..9602d0a87bae 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -799,8 +799,10 @@ mod tests { let root_id = ProcedureId::random(); // Prepare data for the root procedure. for step in 0..3 { + let type_name = root.type_name().to_string(); + let data = root.dump().unwrap(); procedure_store - .store_procedure(root_id, step, &root, None) + .store_procedure(root_id, step, type_name, data, None) .await .unwrap(); } @@ -809,8 +811,10 @@ mod tests { let child_id = ProcedureId::random(); // Prepare data for the child procedure for step in 0..2 { + let type_name = child.type_name().to_string(); + let data = child.dump().unwrap(); procedure_store - .store_procedure(child_id, step, &child, Some(root_id)) + .store_procedure(child_id, step, type_name, data, Some(root_id)) .await .unwrap(); } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index b787fbf77e4b..bd866ea1d4e2 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -385,7 +385,7 @@ impl Runner { } /// Extend the retry time to wait for the next retry. - async fn wait_on_err(&self, d: Duration, i: u64) { + async fn wait_on_err(&mut self, d: Duration, i: u64) { logging::info!( "Procedure {}-{} retry for the {} times after {} millis", self.procedure.type_name(), @@ -396,7 +396,7 @@ impl Runner { time::sleep(d).await; } - async fn on_suspended(&self, subprocedures: Vec) { + async fn on_suspended(&mut self, subprocedures: Vec) { let has_child = !subprocedures.is_empty(); for subprocedure in subprocedures { logging::info!( @@ -429,11 +429,15 @@ impl Runner { } async fn persist_procedure(&mut self) -> Result<()> { + let type_name = self.procedure.type_name().to_string(); + let data = self.procedure.dump()?; + self.store .store_procedure( self.meta.id, self.step, - &self.procedure, + type_name, + data, self.meta.parent_id, ) .await diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index b46428eb6860..434d54950e67 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -116,7 +116,7 @@ pub struct Context { /// A `Procedure` represents an operation or a set of operations to be performed step-by-step. #[async_trait] -pub trait Procedure: Send + Sync { +pub trait Procedure: Send { /// Type name of the procedure. fn type_name(&self) -> &str; diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 11aeec6551af..84dd39520d9a 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -22,7 +22,7 @@ use snafu::ResultExt; use crate::error::{Result, ToJsonSnafu}; pub(crate) use crate::store::state_store::StateStoreRef; -use crate::{BoxedProcedure, ProcedureId}; +use crate::ProcedureId; pub mod state_store; @@ -75,14 +75,12 @@ impl ProcedureStore { &self, procedure_id: ProcedureId, step: u32, - procedure: &BoxedProcedure, + type_name: String, + data: String, parent_id: Option, ) -> Result<()> { - let type_name = procedure.type_name(); - let data = procedure.dump()?; - let message = ProcedureMessage { - type_name: type_name.to_string(), + type_name, data, parent_id, step, @@ -312,6 +310,7 @@ mod tests { use object_store::ObjectStore; use crate::store::state_store::ObjectStateStore; + use crate::BoxedProcedure; impl ProcedureStore { pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore { @@ -481,9 +480,10 @@ mod tests { let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); - + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, &procedure, None) + .store_procedure(procedure_id, 0, type_name, data, None) .await .unwrap(); @@ -507,9 +507,10 @@ mod tests { let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); - + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, &procedure, None) + .store_procedure(procedure_id, 0, type_name, data, None) .await .unwrap(); store.commit_procedure(procedure_id, 1).await.unwrap(); @@ -526,9 +527,10 @@ mod tests { let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); - + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, &procedure, None) + .store_procedure(procedure_id, 0, type_name, data, None) .await .unwrap(); store.rollback_procedure(procedure_id, 1).await.unwrap(); @@ -545,13 +547,16 @@ mod tests { let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); - + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, &procedure, None) + .store_procedure(procedure_id, 0, type_name, data, None) .await .unwrap(); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 1, &procedure, None) + .store_procedure(procedure_id, 1, type_name, data, None) .await .unwrap(); @@ -570,12 +575,17 @@ mod tests { let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, &procedure, None) + .store_procedure(procedure_id, 0, type_name, data, None) .await .unwrap(); + + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 1, &procedure, None) + .store_procedure(procedure_id, 1, type_name, data, None) .await .unwrap(); store.commit_procedure(procedure_id, 2).await.unwrap(); @@ -595,31 +605,41 @@ mod tests { // store 3 steps let id0 = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-0")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id0, 0, &procedure, None) + .store_procedure(id0, 0, type_name, data, None) .await .unwrap(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-1")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id0, 1, &procedure, None) + .store_procedure(id0, 1, type_name, data, None) .await .unwrap(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id0-2")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id0, 2, &procedure, None) + .store_procedure(id0, 2, type_name, data, None) .await .unwrap(); // store 2 steps and then commit let id1 = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-0")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id1, 0, &procedure, None) + .store_procedure(id1, 0, type_name, data, None) .await .unwrap(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id1-1")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id1, 1, &procedure, None) + .store_procedure(id1, 1, type_name, data, None) .await .unwrap(); store.commit_procedure(id1, 2).await.unwrap(); @@ -627,8 +647,10 @@ mod tests { // store 1 step let id2 = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("id2-0")); + let type_name = procedure.type_name().to_string(); + let data = procedure.dump().unwrap(); store - .store_procedure(id2, 0, &procedure, None) + .store_procedure(id2, 0, type_name, data, None) .await .unwrap(); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 77eb38e7dd5b..d13ca93b0eec 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -52,8 +52,7 @@ impl HttpHandler for CatalogsHandler { let stream = self .table_metadata_manager .catalog_manager() - .catalog_names() - .await; + .catalog_names(); let keys = stream .try_collect::>() @@ -84,8 +83,7 @@ impl HttpHandler for SchemasHandler { let stream = self .table_metadata_manager .schema_manager() - .schema_names(catalog) - .await; + .schema_names(catalog); let keys = stream .try_collect::>() @@ -118,8 +116,7 @@ impl HttpHandler for TablesHandler { let stream = self .table_metadata_manager .table_name_manager() - .tables(catalog, schema) - .await; + .tables(catalog, schema); let tables = stream .try_collect::>() .await diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 974f02c52acf..f7b7471ec118 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -168,12 +168,13 @@ impl StatementExecutor { let table_name = TableName::new(catalog, schema, table); self.drop_table(table_name, stmt.drop_if_exists()).await } - Statement::DropDatabase(_stmt) => { - // TODO(weny): implement the drop database procedure - error::NotSupportedSnafu { - feat: "Drop Database", - } - .fail() + Statement::DropDatabase(stmt) => { + self.drop_database( + query_ctx.current_catalog().to_string(), + format_raw_object_name(stmt.name()), + stmt.drop_if_exists(), + ) + .await } Statement::TruncateTable(stmt) => { let (catalog, schema, table) = diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index ea66296462ce..455885bbda89 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -369,6 +369,34 @@ impl StatementExecutor { } } + #[tracing::instrument(skip_all)] + pub async fn drop_database( + &self, + catalog: String, + schema: String, + drop_if_exists: bool, + ) -> Result { + if self + .catalog_manager + .schema_exists(&catalog, &schema) + .await + .context(CatalogSnafu)? + { + self.drop_database_procedure(catalog, schema, drop_if_exists) + .await?; + + Ok(Output::new_with_affected_rows(0)) + } else if drop_if_exists { + // DROP TABLE IF EXISTS meets table not found - ignored + Ok(Output::new_with_affected_rows(0)) + } else { + Err(SchemaNotFoundSnafu { + schema_info: schema, + } + .into_error(snafu::NoneError)) + } + } + #[tracing::instrument(skip_all)] pub async fn truncate_table(&self, table_name: TableName) -> Result { let table = self @@ -545,6 +573,22 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + async fn drop_database_procedure( + &self, + catalog: String, + schema: String, + drop_if_exists: bool, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_drop_database(catalog, schema, drop_if_exists), + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + async fn truncate_table_procedure( &self, table_name: &TableName, diff --git a/tests/cases/standalone/common/catalog/schema.result b/tests/cases/standalone/common/catalog/schema.result index 4c0a29be1f7f..600415d3122c 100644 --- a/tests/cases/standalone/common/catalog/schema.result +++ b/tests/cases/standalone/common/catalog/schema.result @@ -120,7 +120,7 @@ SHOW TABLES FROM public WHERE Tables = 'numbers'; DROP SCHEMA test_public_schema; -Error: 1001(Unsupported), Not supported: Drop Database +Affected Rows: 0 SELECT * FROM test_public_schema.hello; diff --git a/tests/cases/standalone/common/create/create_database.result b/tests/cases/standalone/common/create/create_database.result index 780aca34c32a..da409471c2bb 100644 --- a/tests/cases/standalone/common/create/create_database.result +++ b/tests/cases/standalone/common/create/create_database.result @@ -19,6 +19,5 @@ show databases; | illegal-database | | information_schema | | public | -| test_public_schema | +--------------------+ diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 0417ffb85d26..ca5fb1df9d4d 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -7,7 +7,6 @@ show databases; | illegal-database | | information_schema | | public | -| test_public_schema | | upper_case_table_name | +-----------------------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 23764d8c2bd9..a6dec1d3bafc 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -447,7 +447,7 @@ Affected Rows: 0 drop schema my_db; -Error: 1001(Unsupported), Not supported: Drop Database +Affected Rows: 0 use information_schema; @@ -456,11 +456,8 @@ Affected Rows: 0 -- test query filter for key_column_usage -- select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX'; -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +++ +++ select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; @@ -472,11 +469,8 @@ select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX'; select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX'; -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | -+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ +++ +++ select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX'; @@ -512,8 +506,6 @@ select * from schemata where catalog_name = 'greptime' and schema_name != 'publi | greptime | greptime_private | utf8 | utf8_bin | | | greptime | illegal-database | utf8 | utf8_bin | | | greptime | information_schema | utf8 | utf8_bin | | -| greptime | my_db | utf8 | utf8_bin | | -| greptime | test_public_schema | utf8 | utf8_bin | | | greptime | upper_case_table_name | utf8 | utf8_bin | | +--------------+-----------------------+----------------------------+------------------------+----------+ @@ -570,7 +562,6 @@ select * from key_column_usage; +--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ | constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name | +--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | | | def | public | PRIMARY | def | public | numbers | number | 1 | | | | | +--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ @@ -684,7 +675,7 @@ DESC TABLE GREPTIME_REGION_PEERS; drop table my_db.foo; -Affected Rows: 0 +Error: 4001(TableNotFound), Table not found: greptime.my_db.foo use public;