Skip to content

Commit

Permalink
feat: implement the drop database procedure (#3541)
Browse files Browse the repository at this point in the history
* refactor: remove Sync trait of Procedure

* refactor: remove unnecessary async

* feat: implement the drop database procedure

* refactor: refactor DdlManager register_loaders

* feat: register the DropDatabaseProcedureLoader

* chore: fmt toml

* feat: support to submit DropDatabaseTask

* feat: support drop database stmt

* fix: empty the tables stream

* fix: ensure the factory always exists

* test: update sqlness results

* chore: correct comments

* test: update sqlness results

* test: update sqlness results

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Mar 25, 2024
1 parent 0f1747b commit bf14d33
Show file tree
Hide file tree
Showing 31 changed files with 903 additions and 150 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "349cb385583697f41010dabeb3c106d58f9599b4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
10 changes: 3 additions & 7 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ impl CatalogManager for KvBackendCatalogManager {
let stream = self
.table_metadata_manager
.catalog_manager()
.catalog_names()
.await;
.catalog_names();

let keys = stream
.try_collect::<Vec<_>>()
Expand All @@ -154,8 +153,7 @@ impl CatalogManager for KvBackendCatalogManager {
let stream = self
.table_metadata_manager
.schema_manager()
.schema_names(catalog)
.await;
.schema_names(catalog);
let mut keys = stream
.try_collect::<BTreeSet<_>>()
.await
Expand All @@ -171,8 +169,7 @@ impl CatalogManager for KvBackendCatalogManager {
let stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await;
.tables(catalog, schema);
let mut tables = stream
.try_collect::<Vec<_>>()
.await
Expand Down Expand Up @@ -297,7 +294,6 @@ impl CatalogManager for KvBackendCatalogManager {
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await
.map_ok(|(_, v)| v.table_id());
const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ strum.workspace = true
table.workspace = true
tokio.workspace = true
tonic.workspace = true
typetag = "0.2"

[dev-dependencies]
chrono.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod alter_table;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
Expand Down
171 changes: 171 additions & 0 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cursor;
pub mod end;
pub mod executor;
pub mod metadata;
pub mod start;
use std::fmt::Debug;

use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tonic::async_trait;

use self::start::DropDatabaseStart;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::lock_key::{CatalogLock, SchemaLock};

pub struct DropDatabaseProcedure {
/// The context of procedure runtime.
runtime_context: DdlContext,
context: DropDatabaseContext,

state: Box<dyn State>,
}

/// Target of dropping tables.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum DropTableTarget {
Logical,
Physical,
}

/// Context of [DropDatabaseProcedure] execution.
pub struct DropDatabaseContext {
catalog: String,
schema: String,
drop_if_exists: bool,
tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
}

#[async_trait::async_trait]
#[typetag::serde(tag = "drop_database_state")]
pub(crate) trait State: Send + Debug {
/// Yields the next [State] and [Status].
async fn next(
&mut self,
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;
}

impl DropDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";

pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
Self {
runtime_context: context,
context: DropDatabaseContext {
catalog,
schema,
drop_if_exists,
tables: None,
},
state: Box::new(DropDatabaseStart),
}
}

pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
let DropDatabaseOwnedData {
catalog,
schema,
drop_if_exists,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
runtime_context,
context: DropDatabaseContext {
catalog,
schema,
drop_if_exists,
tables: None,
},
state,
})
}
}

#[async_trait]
impl Procedure for DropDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

let (next, status) = state
.next(&self.runtime_context, &mut self.context)
.await
.map_err(|e| {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;

*state = next;
Ok(status)
}

fn dump(&self) -> ProcedureResult<String> {
let data = DropDatabaseData {
catalog: &self.context.catalog,
schema: &self.context.schema,
drop_if_exists: self.context.drop_if_exists,
state: self.state.as_ref(),
};

serde_json::to_string(&data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let lock_key = vec![
CatalogLock::Read(&self.context.catalog).into(),
SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
];

LockKey::new(lock_key)
}
}

#[derive(Debug, Serialize)]
struct DropDatabaseData<'a> {
// The catalog name
catalog: &'a str,
// The schema name
schema: &'a str,
drop_if_exists: bool,
state: &'a dyn State,
}

#[derive(Debug, Deserialize)]
struct DropDatabaseOwnedData {
// The catalog name
catalog: String,
// The schema name
schema: String,
drop_if_exists: bool,
state: Box<dyn State>,
}
141 changes: 141 additions & 0 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_procedure::Status;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;

use super::executor::DropDatabaseExecutor;
use super::metadata::DropDatabaseRemoveMetadata;
use super::DropTableTarget;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseCursor {
target: DropTableTarget,
}

impl DropDatabaseCursor {
/// Returns a new [DropDatabaseCursor].
pub fn new(target: DropTableTarget) -> Self {
Self { target }
}

fn handle_reach_end(
&mut self,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
match self.target {
DropTableTarget::Logical => {
// Consumes the tables stream.
ctx.tables.take();

Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
))
}
DropTableTarget::Physical => Ok((
Box::new(DropDatabaseRemoveMetadata),
Status::executing(true),
)),
}
}

async fn handle_table(
&mut self,
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
table_name: String,
table_id: TableId,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
) -> Result<(Box<dyn State>, Status)> {
match (self.target, table_route_value.get_inner_ref()) {
(DropTableTarget::Logical, TableRouteValue::Logical(_))
| (DropTableTarget::Physical, TableRouteValue::Physical(_)) => {
// TODO(weny): Maybe we can drop the table without fetching the `TableInfoValue`
let table_info_value = ddl_ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.context(error::TableNotFoundSnafu {
table_name: &table_name,
})?;
Ok((
Box::new(DropDatabaseExecutor::new(
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_id,
table_info_value,
table_route_value,
self.target,
)),
Status::executing(true),
))
}
_ => Ok((
Box::new(DropDatabaseCursor::new(self.target)),
Status::executing(false),
)),
}
}
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for DropDatabaseCursor {
async fn next(
&mut self,
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.tables.as_deref().is_none() {
let tables = ddl_ctx
.table_metadata_manager
.table_name_manager()
.tables(&ctx.catalog, &ctx.schema);
ctx.tables = Some(tables);
}
// Safety: must exist
match ctx.tables.as_mut().unwrap().try_next().await? {
Some((table_name, table_name_value)) => {
let table_id = table_name_value.table_id();
match ddl_ctx
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.await?
{
Some(table_route_value) => {
self.handle_table(ddl_ctx, ctx, table_name, table_id, table_route_value)
.await
}
None => Ok((
Box::new(DropDatabaseCursor::new(self.target)),
Status::executing(false),
)),
}
}
None => self.handle_reach_end(ctx),
}
}
}
Loading

0 comments on commit bf14d33

Please sign in to comment.