Skip to content

Commit

Permalink
feat: support to drop flow (#3900)
Browse files Browse the repository at this point in the history
* feat: support to drop flow

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 10, 2024
1 parent 9d36c31 commit 06e1c43
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 11 deletions.
43 changes: 41 additions & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
DropDatabase, DropLogicalTables, DropTable, TruncateTable,
DropDatabase, DropFlow, DropLogicalTables, DropTable, TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask,
DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -347,6 +348,20 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop flow task.
pub async fn submit_drop_flow_task(
&self,
cluster_id: ClusterId,
drop_flow: DropFlowTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = DropFlowProcedure::new(cluster_id, drop_flow, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
Expand Down Expand Up @@ -607,6 +622,27 @@ async fn handle_drop_database_task(
})
}

async fn handle_drop_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
drop_flow_task: DropFlowTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_drop_flow_task(cluster_id, drop_flow_task.clone())
.await?;

let procedure_id = id.to_string();
info!(
"Flow {}.{}({}) is dropped via procedure_id {id:?}",
drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
);

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}

async fn handle_create_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
Expand Down Expand Up @@ -725,6 +761,9 @@ impl ProcedureExecutor for DdlManager {
)
.await
}
DropFlow(drop_flow_task) => {
handle_drop_flow_task(self, cluster_id, drop_flow_task).await
}
}
}
.trace(span)
Expand Down
8 changes: 7 additions & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ pub enum DdlTask {
CreateDatabase(CreateDatabaseTask),
DropDatabase(DropDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
}

impl DdlTask {
pub fn new_create_flow(expr: CreateFlowTask) -> Self {
DdlTask::CreateFlow(expr)
}

pub fn new_drop_flow(expr: DropFlowTask) -> Self {
DdlTask::DropFlow(expr)
}

pub fn new_create_table(
expr: CreateTableExpr,
partitions: Vec<Partition>,
Expand Down Expand Up @@ -237,6 +242,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
};

Ok(Self {
Expand Down Expand Up @@ -819,7 +825,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
}

/// Drop flow
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DropFlowTask {
pub catalog_name: String,
pub flow_name: String,
Expand Down
4 changes: 4 additions & 0 deletions src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ pub enum Error {
#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },

#[snafu(display("Flow not found: {}", flow_name))]
FlowNotFound { flow_name: String },

#[snafu(display("Failed to join task"))]
JoinTask {
#[snafu(source)]
Expand Down Expand Up @@ -620,6 +623,7 @@ impl ErrorExt for Error {
Error::EncodeJson { .. } => StatusCode::Unexpected,

Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::FlowNotFound { .. } => StatusCode::FlowNotFound,

Error::JoinTask { .. } => StatusCode::Internal,

Expand Down
3 changes: 3 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub struct StatementExecutor {
query_engine: QueryEngineRef,
procedure_executor: ProcedureExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
Expand All @@ -84,6 +86,7 @@ impl StatementExecutor {
query_engine,
procedure_executor,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,
inserter,
Expand Down
69 changes: 61 additions & 8 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ use api::v1::{column_def, AlterExpr, CreateTableExpr};
use catalog::CatalogManagerRef;
use chrono::Utc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::ddl::{
CreateFlowTask, DdlTask, DropFlowTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
Expand All @@ -43,7 +45,7 @@ use query::sql::create_table_stmt;
use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
Expand All @@ -60,7 +62,7 @@ use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, FlowNotFoundSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
Expand Down Expand Up @@ -359,6 +361,57 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn drop_flow(
&self,
catalog_name: String,
flow_name: String,
drop_if_exists: bool,
query_context: QueryContextRef,
) -> Result<Output> {
if let Some(flow) = self
.flow_metadata_manager
.flow_name_manager()
.get(&catalog_name, &flow_name)
.await
.context(error::TableMetadataManagerSnafu)?
{
let flow_id = flow.flow_id();
let task = DropFlowTask {
catalog_name,
flow_name,
flow_id,
drop_if_exists,
};
self.drop_flow_procedure(task, query_context).await?;

Ok(Output::new_with_affected_rows(0))
} else if drop_if_exists {
Ok(Output::new_with_affected_rows(0))
} else {
FlowNotFoundSnafu {
flow_name: format_full_flow_name(&catalog_name, &flow_name),
}
.fail()
}
}

async fn drop_flow_procedure(
&self,
expr: DropFlowTask,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_drop_flow(expr),
};

self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(
&self,
Expand Down Expand Up @@ -417,10 +470,10 @@ impl StatementExecutor {
// DROP TABLE IF EXISTS meets table not found - ignored
Ok(Output::new_with_affected_rows(0))
} else {
Err(TableNotFoundSnafu {
TableNotFoundSnafu {
table_name: table_name.to_string(),
}
.into_error(snafu::NoneError))
.fail()
}
}

Expand All @@ -446,10 +499,10 @@ impl StatementExecutor {
// DROP TABLE IF EXISTS meets table not found - ignored
Ok(Output::new_with_affected_rows(0))
} else {
Err(SchemaNotFoundSnafu {
SchemaNotFoundSnafu {
schema_info: schema,
}
.into_error(snafu::NoneError))
.fail()
}
}

Expand Down

0 comments on commit 06e1c43

Please sign in to comment.