diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1e4d95349019..cd6bb67e0911 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -45,11 +45,12 @@ use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::rpc::ddl::DdlTask::{ AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, - DropDatabase, DropLogicalTables, DropTable, TruncateTable, + DropDatabase, DropFlow, DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, - DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, + TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -347,6 +348,20 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a drop flow task. + pub async fn submit_drop_flow_task( + &self, + cluster_id: ClusterId, + drop_flow: DropFlowTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = DropFlowProcedure::new(cluster_id, drop_flow, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( @@ -607,6 +622,27 @@ async fn handle_drop_database_task( }) } +async fn handle_drop_flow_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + drop_flow_task: DropFlowTask, +) -> Result { + let (id, _) = ddl_manager + .submit_drop_flow_task(cluster_id, drop_flow_task.clone()) + .await?; + + let procedure_id = id.to_string(); + info!( + "Flow {}.{}({}) is dropped via procedure_id {id:?}", + drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id, + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + async fn handle_create_flow_task( ddl_manager: &DdlManager, cluster_id: ClusterId, @@ -725,6 +761,9 @@ impl ProcedureExecutor for DdlManager { ) .await } + DropFlow(drop_flow_task) => { + handle_drop_flow_task(self, cluster_id, drop_flow_task).await + } } } .trace(span) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index b009a257e6aa..8a6160b15cbc 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -54,6 +54,7 @@ pub enum DdlTask { CreateDatabase(CreateDatabaseTask), DropDatabase(DropDatabaseTask), CreateFlow(CreateFlowTask), + DropFlow(DropFlowTask), } impl DdlTask { @@ -61,6 +62,10 @@ impl DdlTask { DdlTask::CreateFlow(expr) } + pub fn new_drop_flow(expr: DropFlowTask) -> Self { + DdlTask::DropFlow(expr) + } + pub fn new_create_table( expr: CreateTableExpr, partitions: Vec, @@ -237,6 +242,7 @@ impl TryFrom for PbDdlTaskRequest { DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?), DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()), + DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()), }; Ok(Self { @@ -819,7 +825,7 @@ impl From for PbCreateFlowTask { } /// Drop flow -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DropFlowTask { pub catalog_name: String, pub flow_name: String, diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index f15b5fbcae2f..344a6c9ec4ff 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -129,6 +129,9 @@ pub enum Error { #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String }, + #[snafu(display("Flow not found: {}", flow_name))] + FlowNotFound { flow_name: String }, + #[snafu(display("Failed to join task"))] JoinTask { #[snafu(source)] @@ -620,6 +623,7 @@ impl ErrorExt for Error { Error::EncodeJson { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::FlowNotFound { .. } => StatusCode::FlowNotFound, Error::JoinTask { .. } => StatusCode::Internal, diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 4dadffd1240d..a803f4d080d4 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -28,6 +28,7 @@ use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; +use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; @@ -65,6 +66,7 @@ pub struct StatementExecutor { query_engine: QueryEngineRef, procedure_executor: ProcedureExecutorRef, table_metadata_manager: TableMetadataManagerRef, + flow_metadata_manager: FlowMetadataManagerRef, partition_manager: PartitionRuleManagerRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, @@ -84,6 +86,7 @@ impl StatementExecutor { query_engine, procedure_executor, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())), partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), cache_invalidator, inserter, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 6ca5bb54b74d..5299b9b5239f 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -20,14 +20,16 @@ use api::v1::{column_def, AlterExpr, CreateTableExpr}; use catalog::CatalogManagerRef; use chrono::Utc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::format_full_table_name; +use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; use common_meta::ddl::ExecutorContext; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; -use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_meta::rpc::ddl::{ + CreateFlowTask, DdlTask, DropFlowTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, +}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; @@ -43,7 +45,7 @@ use query::sql::create_table_stmt; use regex::Regex; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; -use snafu::{ensure, IntoError, OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; use sql::statements::create::{ CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, @@ -60,7 +62,7 @@ use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu, - DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, + DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, FlowNotFoundSnafu, InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, @@ -359,6 +361,57 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + #[tracing::instrument(skip_all)] + pub async fn drop_flow( + &self, + catalog_name: String, + flow_name: String, + drop_if_exists: bool, + query_context: QueryContextRef, + ) -> Result { + if let Some(flow) = self + .flow_metadata_manager + .flow_name_manager() + .get(&catalog_name, &flow_name) + .await + .context(error::TableMetadataManagerSnafu)? + { + let flow_id = flow.flow_id(); + let task = DropFlowTask { + catalog_name, + flow_name, + flow_id, + drop_if_exists, + }; + self.drop_flow_procedure(task, query_context).await?; + + Ok(Output::new_with_affected_rows(0)) + } else if drop_if_exists { + Ok(Output::new_with_affected_rows(0)) + } else { + FlowNotFoundSnafu { + flow_name: format_full_flow_name(&catalog_name, &flow_name), + } + .fail() + } + } + + async fn drop_flow_procedure( + &self, + expr: DropFlowTask, + query_context: QueryContextRef, + ) -> Result { + let request = SubmitDdlTaskRequest { + query_context, + task: DdlTask::new_drop_flow(expr), + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + #[tracing::instrument(skip_all)] pub async fn alter_logical_tables( &self, @@ -417,10 +470,10 @@ impl StatementExecutor { // DROP TABLE IF EXISTS meets table not found - ignored Ok(Output::new_with_affected_rows(0)) } else { - Err(TableNotFoundSnafu { + TableNotFoundSnafu { table_name: table_name.to_string(), } - .into_error(snafu::NoneError)) + .fail() } } @@ -446,10 +499,10 @@ impl StatementExecutor { // DROP TABLE IF EXISTS meets table not found - ignored Ok(Output::new_with_affected_rows(0)) } else { - Err(SchemaNotFoundSnafu { + SchemaNotFoundSnafu { schema_info: schema, } - .into_error(snafu::NoneError)) + .fail() } }