diff --git a/Cargo.lock b/Cargo.lock index 4671cbf69881..6a209e795c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3866,7 +3866,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=73ac0207ab71dfea48f30259ffdb611501b5ecb8#73ac0207ab71dfea48f30259ffdb611501b5ecb8" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=783682fabc38c57b5b9d46bdcfeebe2496e85bbb#783682fabc38c57b5b9d46bdcfeebe2496e85bbb" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index a74099fa4cfc..78b89a5e1707 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "73ac0207ab71dfea48f30259ffdb611501b5ecb8" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "783682fabc38c57b5b9d46bdcfeebe2496e85bbb" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index c71bb0795cb3..ec43253f39d8 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -518,6 +518,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", + Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task", + Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", None => "ddl.empty", } } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index f48e2f6486da..a7e14161ecc6 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -18,15 +18,16 @@ use std::result; use api::v1::meta::ddl_task_request::Task; use api::v1::meta::{ AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, - CreateDatabaseTask as PbCreateDatabaseTask, CreateTableTask as PbCreateTableTask, - CreateTableTasks as PbCreateTableTasks, DdlTaskRequest as PbDdlTaskRequest, - DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask, + CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask, + CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, + DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, + DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, - TruncateTableExpr, + AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr, + DropFlowTaskExpr, DropTableExpr, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -181,6 +182,8 @@ impl TryFrom for DdlTask { Task::DropDatabaseTask(drop_database) => { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } + Task::CreateFlowTask(_) => unimplemented!(), + Task::DropFlowTask(_) => unimplemented!(), } } } @@ -720,6 +723,129 @@ impl TryFrom for PbDropDatabaseTask { } } +/// Create flow task +pub struct CreateFlowTask { + pub catalog_name: String, + pub task_name: String, + pub source_table_names: Vec, + pub sink_table_name: TableName, + pub or_replace: bool, + pub create_if_not_exists: bool, + pub expire_when: String, + pub comment: String, + pub sql: String, + pub options: HashMap, +} + +impl TryFrom for CreateFlowTask { + type Error = error::Error; + + fn try_from(pb: PbCreateFlowTask) -> Result { + let CreateFlowTaskExpr { + catalog_name, + task_name, + source_table_names, + sink_table_name, + or_replace, + create_if_not_exists, + expire_when, + comment, + sql, + task_options, + } = pb.create_flow_task.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create_flow_task", + })?; + + Ok(CreateFlowTask { + catalog_name, + task_name, + source_table_names: source_table_names.into_iter().map(Into::into).collect(), + sink_table_name: sink_table_name + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected sink_table_name", + })? + .into(), + or_replace, + create_if_not_exists, + expire_when, + comment, + sql, + options: task_options, + }) + } +} + +impl From for PbCreateFlowTask { + fn from( + CreateFlowTask { + catalog_name, + task_name, + source_table_names, + sink_table_name, + or_replace, + create_if_not_exists, + expire_when, + comment, + sql, + options, + }: CreateFlowTask, + ) -> Self { + PbCreateFlowTask { + create_flow_task: Some(CreateFlowTaskExpr { + catalog_name, + task_name, + source_table_names: source_table_names.into_iter().map(Into::into).collect(), + sink_table_name: Some(sink_table_name.into()), + or_replace, + create_if_not_exists, + expire_when, + comment, + sql, + task_options: options, + }), + } + } +} + +/// Drop flow task +pub struct DropFlowTask { + pub catalog_name: String, + pub task_name: String, +} + +impl TryFrom for DropFlowTask { + type Error = error::Error; + + fn try_from(pb: PbDropFlowTask) -> Result { + let DropFlowTaskExpr { + catalog_name, + task_name, + } = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu { + err_msg: "expected sink_table_name", + })?; + Ok(DropFlowTask { + catalog_name, + task_name, + }) + } +} + +impl From for PbDropFlowTask { + fn from( + DropFlowTask { + catalog_name, + task_name, + }: DropFlowTask, + ) -> Self { + PbDropFlowTask { + drop_flow_task: Some(DropFlowTaskExpr { + catalog_name, + task_name, + }), + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/table_name.rs b/src/common/meta/src/table_name.rs index 62615e5c211b..645e6386df02 100644 --- a/src/common/meta/src/table_name.rs +++ b/src/common/meta/src/table_name.rs @@ -14,7 +14,7 @@ use std::fmt::{Display, Formatter}; -use api::v1::meta::TableName as PbTableName; +use api::v1::TableName as PbTableName; use serde::{Deserialize, Serialize}; use table::table_reference::TableReference; diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 73ec35df5d49..551a7da85d31 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -109,7 +109,6 @@ impl GrpcQueryHandler for Instance { match expr { DdlExpr::CreateTable(mut expr) => { - // TODO(weny): supports to create multiple region table. let _ = self .statement_executor .create_table_inner(&mut expr, None, &ctx) @@ -138,6 +137,12 @@ impl GrpcQueryHandler for Instance { TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.statement_executor.truncate_table(table_name).await? } + DdlExpr::CreateFlowTask(_) => { + unimplemented!() + } + DdlExpr::DropFlowTask(_) => { + unimplemented!() + } } } }; @@ -176,6 +181,16 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte Expr::TruncateTable(expr) => { check_and_fill!(expr); } + Expr::CreateFlowTask(expr) => { + if expr.catalog_name.is_empty() { + expr.catalog_name = catalog.to_string(); + } + } + Expr::DropFlowTask(expr) => { + if expr.catalog_name.is_empty() { + expr.catalog_name = catalog.to_string(); + } + } } }