Skip to content

Commit

Permalink
feat: register procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Dec 5, 2023
1 parent 6d4cce9 commit 18b6dbc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
12 changes: 8 additions & 4 deletions src/common/meta/src/ddl/create_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use common_catalog::build_db_string;
use common_procedure::error::ToJsonSnafu;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
Expand All @@ -37,7 +37,7 @@ pub struct CreateDatabaseProcedure {
impl CreateDatabaseProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase";

pub fn new(cluster_id: u64, task: CreateDatabaseTask, context: DdlContext) -> Self {
pub(crate) fn new(cluster_id: u64, task: CreateDatabaseTask, context: DdlContext) -> Self {
Self {
context,
data: CreateDatabaseData {
Expand All @@ -48,14 +48,18 @@ impl CreateDatabaseProcedure {
}
}

pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}

/// Check if the database already exists
async fn on_prepare(&mut self) -> Result<Status> {
let catalog = &self.data.task.catalog;
let database_name = &self.data.task.database_name;
let schema_key = SchemaNameKey::new(catalog, database_name);

let schema_manger = self.context.table_metadata_manager.schema_manager();
let exists = schema_manger.exists(schema_key).await?;
let exists = self.schema_manager().exists(schema_key).await?;

if exists {
ensure!(
Expand Down
18 changes: 14 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ impl DdlManager {
)
.context(RegisterProcedureLoaderSnafu {
type_name: TruncateTableProcedure::TYPE_NAME,
})?;

let context = self.create_context();

self.procedure_manager
.register_loader(
CreateDatabaseProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
CreateDatabaseProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: CreateDatabaseProcedure::TYPE_NAME,
})
}

Expand Down Expand Up @@ -190,10 +204,6 @@ impl DdlManager {
cluster_id: u64,
create_database_task: CreateDatabaseTask,
) -> Result<ProcedureId> {
info!(
"{cluster_id}: submit create database task: {:?}",
create_database_task
);
let context = self.create_context();

let procedure = CreateDatabaseProcedure::new(cluster_id, create_database_task, context);
Expand Down

0 comments on commit 18b6dbc

Please sign in to comment.