From 1c1bada29003fbea339b14a1ec06bd68e5c4ee2e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 Apr 2024 17:08:55 +0800 Subject: [PATCH 01/13] feat: impl parser for CREATE TASK Signed-off-by: Ruihang Xia --- src/sql/src/parsers/create_parser.rs | 194 ++++++++++++++++++++++++++- src/sql/src/statements/create.rs | 42 +++++- src/sql/src/statements/statement.rs | 4 + 3 files changed, 235 insertions(+), 5 deletions(-) diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 08764aaa6e42..fb5c049d3636 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -32,7 +32,8 @@ use crate::error::{ }; use crate::parser::ParserContext; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, Partitions, TIME_INDEX, + CreateDatabase, CreateExternalTable, CreateFlowTask, CreateTable, CreateTableLike, Partitions, + TIME_INDEX, }; use crate::statements::get_data_type_by_alias_name; use crate::statements::statement::Statement; @@ -40,6 +41,10 @@ use crate::util::parse_option_string; pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; +pub const TASK: &str = "TASK"; +pub const SINK: &str = "SINK"; +pub const EXPIRE: &str = "EXPIRE"; +pub const WHEN: &str = "WHEN"; /// Parses create [table] statement impl<'a> ParserContext<'a> { @@ -52,6 +57,34 @@ impl<'a> ParserContext<'a> { Keyword::EXTERNAL => self.parse_create_external_table(), + Keyword::OR => { + let _ = self.parser.next_token(); + self.parser + .expect_keyword(Keyword::REPLACE) + .context(SyntaxSnafu)?; + match self.parser.next_token().token { + Token::Word(w) => match w.keyword { + Keyword::NoKeyword => { + let uppercase = w.value.to_uppercase(); + match uppercase.as_str() { + TASK => self.parse_create_flow_task(true), + _ => self.unsupported(w.to_string()), + } + } + _ => self.unsupported(w.to_string()), + }, + _ => self.unsupported(w.to_string()), + } + } + + Keyword::NoKeyword => { + let uppercase = w.value.to_uppercase(); + match uppercase.as_str() { + TASK => self.parse_create_flow_task(false), + _ => self.unsupported(w.to_string()), + } + } + _ => self.unsupported(w.to_string()), }, unexpected => self.unsupported(unexpected.to_string()), @@ -177,8 +210,70 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateTable(create_table)) } - /// "PARTITION BY ..." syntax: - // TODO(ruihang): docs + /// "CREATE TASK" clause + fn parse_create_flow_task(&mut self, or_replace: bool) -> Result { + // consume `TASK` + // let _ = self.parser.next_token(); + + let if_not_exists = + self.parser + .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + + let task_name = self.intern_parse_table_name()?; + + self.parser + .expect_token(&Token::make_keyword(SINK)) + .context(SyntaxSnafu)?; + self.parser + .expect_keyword(Keyword::TO) + .context(SyntaxSnafu)?; + + let output_table_name = self.intern_parse_table_name()?; + + let expire_when = if self + .parser + .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(WHEN)]) + { + Some(self.parser.parse_expr().context(error::SyntaxSnafu)?) + } else { + None + }; + + let comment = if self.parser.parse_keyword(Keyword::COMMENT) { + match self.parser.next_token() { + TokenWithLocation { + token: Token::SingleQuotedString(value, ..), + .. + } => Some(value), + unexpected => { + return self + .parser + .expected("string", unexpected) + .context(SyntaxSnafu) + } + } + } else { + None + }; + + self.parser + .expect_keyword(Keyword::AS) + .context(SyntaxSnafu)?; + + let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?); + + Ok(Statement::CreateFlowTask(CreateFlowTask { + task_name, + output_table_name, + or_replace, + if_not_exists, + expire_when, + comment, + query, + })) + } + + /// "PARTITION BY ..." clause fn parse_partitions(&mut self) -> Result> { if !self.parser.parse_keyword(Keyword::PARTITION) { return Ok(None); @@ -729,7 +824,7 @@ mod tests { use common_catalog::consts::FILE_ENGINE; use common_error::ext::ErrorExt; use sqlparser::ast::ColumnOption::NotNull; - use sqlparser::ast::{BinaryOperator, ObjectName, Value}; + use sqlparser::ast::{BinaryOperator, Expr, Function, Interval, ObjectName, Value}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -937,6 +1032,97 @@ mod tests { ); } + #[test] + fn test_parse_create_flow_task() { + let sql = r" +CREATE OR REPLACE TASK IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE WHEN timestamp < now() - INTERVAL '5m' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + let create_task = match &stmts[0] { + Statement::CreateFlowTask(c) => c, + _ => unreachable!(), + }; + + let expected = CreateFlowTask { + task_name: ObjectName(vec![Ident { + value: "task_1".to_string(), + quote_style: None, + }]), + output_table_name: ObjectName(vec![ + Ident { + value: "schema_1".to_string(), + quote_style: None, + }, + Ident { + value: "table_1".to_string(), + quote_style: None, + }, + ]), + or_replace: true, + if_not_exists: true, + expire_when: Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Ident { + value: "timestamp".to_string(), + quote_style: None, + })), + op: BinaryOperator::Lt, + right: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Function(Function { + name: ObjectName(vec![Ident { + value: "now".to_string(), + quote_style: None, + }]), + args: vec![], + filter: None, + null_treatment: None, + over: None, + distinct: false, + special: false, + order_by: vec![], + })), + op: BinaryOperator::Minus, + right: Box::new(Expr::Interval(Interval { + value: Box::new(Expr::Value(Value::SingleQuotedString("5m".to_string()))), + leading_field: None, + leading_precision: None, + last_field: None, + fractional_seconds_precision: None, + })), + }), + }), + comment: Some("test comment".to_string()), + // ignore query parse result + query: create_task.query.clone(), + }; + assert_eq!(create_task, &expected); + + // create task without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE WHEN` and `COMMENT` + let sql = r" +CREATE TASK task_1 +SINK TO schema_1.table_1 +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + let create_task = match &stmts[0] { + Statement::CreateFlowTask(c) => c, + _ => unreachable!(), + }; + assert!(!create_task.or_replace); + assert!(!create_task.if_not_exists); + assert!(create_task.expire_when.is_none()); + assert!(create_task.comment.is_none()); + } + #[test] fn test_validate_create() { let sql = r" diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index eb992b48ef45..ca255d0b0839 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -16,7 +16,7 @@ use std::fmt::{Display, Formatter}; use common_catalog::consts::FILE_ENGINE; use itertools::Itertools; -use sqlparser::ast::Expr; +use sqlparser::ast::{Expr, Query}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue}; @@ -242,6 +242,46 @@ impl Display for CreateTableLike { } } +#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] +pub struct CreateFlowTask { + /// Task name + pub task_name: ObjectName, + /// Output (sink) table name + pub output_table_name: ObjectName, + /// Whether to replace existing task + pub or_replace: bool, + /// Create if not exist + pub if_not_exists: bool, + /// `EXPIRE_WHEN` + pub expire_when: Option, + /// Comment string + pub comment: Option, + /// SQL statement + pub query: Box, +} + +impl Display for CreateFlowTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CREATE ")?; + if self.or_replace { + write!(f, "OR REPLACE ")?; + } + write!(f, "TASK ")?; + if self.if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } + write!(f, "{} ", &self.task_name)?; + write!(f, "OUTPUT AS {} ", &self.output_table_name)?; + if let Some(expire_when) = &self.expire_when { + write!(f, "EXPIRE WHEN {} ", expire_when)?; + } + if let Some(comment) = &self.comment { + write!(f, "COMMENT '{}' ", comment)?; + } + write!(f, "AS {}", &self.query) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 70566893d65a..047b2d6f5b8a 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -18,6 +18,7 @@ use datafusion_sql::parser::Statement as DfStatement; use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; +use super::create::CreateFlowTask; use super::drop::DropDatabase; use super::show::ShowVariables; use crate::error::{ConvertToDfStatementSnafu, Error}; @@ -54,6 +55,8 @@ pub enum Statement { CreateExternalTable(CreateExternalTable), // CREATE TABLE ... LIKE CreateTableLike(CreateTableLike), + // CREATE TASK + CreateFlowTask(CreateFlowTask), // DROP TABLE DropTable(DropTable), // DROP DATABASE @@ -100,6 +103,7 @@ impl Display for Statement { Statement::CreateTable(s) => s.fmt(f), Statement::CreateExternalTable(s) => s.fmt(f), Statement::CreateTableLike(s) => s.fmt(f), + Statement::CreateFlowTask(s) => s.fmt(f), Statement::DropTable(s) => s.fmt(f), Statement::DropDatabase(s) => s.fmt(f), Statement::CreateDatabase(s) => s.fmt(f), From 99860b398ac5bb00b09461c5e92fa659cbfece71 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 14:55:05 +0800 Subject: [PATCH 02/13] finish parser Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 2 ++ src/operator/src/statement.rs | 4 ++++ src/sql/src/parsers/create_parser.rs | 3 --- 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4671cbf69881..52c14bfdc7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3866,7 +3866,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=73ac0207ab71dfea48f30259ffdb611501b5ecb8#73ac0207ab71dfea48f30259ffdb611501b5ecb8" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d18b1c8b8a23f7bd4f108e2c2f580c561e6346d0#d18b1c8b8a23f7bd4f108e2c2f580c561e6346d0" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index a74099fa4cfc..eb0f8d61028a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "73ac0207ab71dfea48f30259ffdb611501b5ecb8" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d18b1c8b8a23f7bd4f108e2c2f580c561e6346d0" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index c71bb0795cb3..ec43253f39d8 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -518,6 +518,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", + Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task", + Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", None => "ddl.empty", } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 2ecb35e91319..784219575a95 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -164,6 +164,10 @@ impl StatementExecutor { let _ = self.create_external_table(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } + Statement::CreateFlowTask(stmt) => { + let _ = self.create_flow_task(stmt, query_ctx).await?; + Ok(Output::new_with_affected_rows(0)) + } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { let (catalog, schema, table) = diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index fb5c049d3636..f7043c0ed5a5 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -212,9 +212,6 @@ impl<'a> ParserContext<'a> { /// "CREATE TASK" clause fn parse_create_flow_task(&mut self, or_replace: bool) -> Result { - // consume `TASK` - // let _ = self.parser.next_token(); - let if_not_exists = self.parser .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); From 495261f97f2f9c882ac1e6d6dfa9ec7c9c4cd403 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 14:56:56 +0800 Subject: [PATCH 03/13] wip expr Signed-off-by: Ruihang Xia --- src/operator/src/expr_factory.rs | 22 ++++++++++++++++++++-- src/operator/src/statement/ddl.rs | 15 ++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index cc950f6ba7ec..83bc9cd13a2e 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,7 +18,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, - CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, + CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -34,7 +34,7 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; -use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX}; +use sql::statements::create::{CreateExternalTable, CreateFlowTask, CreateTable, TIME_INDEX}; use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; use sql::util::to_lowercase_options_map; use table::requests::{TableOptions, FILE_TABLE_META_KEY}; @@ -489,6 +489,24 @@ pub(crate) fn to_alter_expr( }) } +fn to_create_flow_task_expr( + create_task: CreateFlowTask, + query_ctx: QueryContextRef, +) -> Result { + Ok(CreateFlowTaskExpr { + catalog_name: query_ctx.current_catalog().to_string(), + task_name: create_task.task_name, + source_table_names: todo!(), + sink_table_name: todo!(), + create_if_not_exists: create_task.if_not_exists, + or_replace: create_task.or_replace, + expire_when: create_task.expire_when.to_string(), + comment: create_task.comment, + sql: create_task.query.to_string(), + task_options: HashMap::new(), + }) +} + #[cfg(test)] mod tests { use datatypes::value::Value; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2ab87fa4513e..e2c60499d372 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -45,7 +45,9 @@ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; -use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; +use sql::statements::create::{ + CreateExternalTable, CreateFlowTask, CreateTable, CreateTableLike, Partitions, +}; use sql::statements::sql_value_to_value; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; @@ -320,6 +322,17 @@ impl StatementExecutor { .collect()) } + #[tracing::instrument(skip_all)] + pub async fn create_flow_task( + &self, + stmt: CreateFlowTask, + query_ctx: QueryContextRef, + ) -> Result<()> { + // TODO: do some verification + + todo!() + } + #[tracing::instrument(skip_all)] pub async fn alter_logical_tables(&self, alter_table_exprs: Vec) -> Result { let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); From 1a00964758527c4c62909687b8a7b3b65a21b0c8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 16:25:17 +0800 Subject: [PATCH 04/13] finish expr Signed-off-by: Ruihang Xia --- src/common/meta/Cargo.toml | 1 + src/frontend/src/instance.rs | 4 ++ src/operator/src/error.rs | 11 ++++- src/operator/src/expr_factory.rs | 69 +++++++++++++++++++++++++++---- src/operator/src/statement.rs | 2 +- src/operator/src/statement/ddl.rs | 6 ++- src/sql/src/util.rs | 45 +++++++++++++++++++- 7 files changed, 125 insertions(+), 13 deletions(-) diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 9132f6de559b..40df9d6180c0 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -28,6 +28,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +datafusion-common.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8e45d37af02a..1470f53e95d9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -494,6 +494,10 @@ pub fn check_permission( Statement::CreateExternalTable(stmt) => { validate_param(&stmt.name, query_ctx)?; } + Statement::CreateFlowTask(stmt) => { + // TODO: should also validate source table name here? + validate_param(&stmt.output_table_name, query_ctx)?; + } Statement::Alter(stmt) => { validate_param(stmt.table_name(), query_ctx)?; } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index dd8d64f10ffa..f15b5fbcae2f 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -82,6 +82,14 @@ pub enum Error { source: sql::error::Error, }, + #[snafu(display("Failed to convert identifier: {}", ident))] + ConvertIdentifier { + ident: String, + location: Location, + #[snafu(source)] + error: datafusion::error::DataFusionError, + }, + #[snafu(display("Failed to convert value to sql value: {}", value))] ConvertSqlValue { value: Value, @@ -568,7 +576,8 @@ impl ErrorExt for Error { | Error::InferFileTableSchema { .. } | Error::SchemaIncompatible { .. } | Error::UnsupportedRegionRequest { .. } - | Error::InvalidTableName { .. } => StatusCode::InvalidArguments, + | Error::InvalidTableName { .. } + | Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 659cb6475a99..7246de8351dc 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -19,10 +19,12 @@ use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, + TableName, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; use common_time::Timezone; +use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; use file_engine::FileOptions; use query::sql::{ @@ -36,14 +38,15 @@ use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::create::{CreateExternalTable, CreateFlowTask, CreateTable, TIME_INDEX}; use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; +use sql::util::extract_tables_from_query; use table::requests::{TableOptions, FILE_TABLE_META_KEY}; use table::table_reference::TableReference; use crate::error::{ BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, - EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, - InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, - SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, + ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, + InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, + PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, }; #[derive(Debug, Copy, Clone)] @@ -487,19 +490,67 @@ pub(crate) fn to_alter_expr( }) } -fn to_create_flow_task_expr( +pub fn to_create_flow_task_expr( create_task: CreateFlowTask, query_ctx: QueryContextRef, ) -> Result { + // retrieve sink table name + let sink_table_ref = + object_name_to_table_reference(create_task.output_table_name.clone().into(), true) + .with_context(|_| ConvertIdentifierSnafu { + ident: create_task.output_table_name.to_string(), + })?; + let catalog = sink_table_ref + .catalog() + .unwrap_or(query_ctx.current_catalog()) + .to_string(); + let schema = sink_table_ref + .schema() + .unwrap_or(query_ctx.current_schema()) + .to_string(); + let sink_table_name = TableName { + catalog_name: catalog, + schema_name: schema, + table_name: sink_table_ref.table().to_string(), + }; + + let source_table_names = extract_tables_from_query(&create_task.query) + .map(|name| { + let reference = object_name_to_table_reference(name.clone().into(), true) + .with_context(|_| ConvertIdentifierSnafu { + ident: name.to_string(), + })?; + let catalog = reference + .catalog() + .unwrap_or(query_ctx.current_catalog()) + .to_string(); + let schema = reference + .schema() + .unwrap_or(query_ctx.current_schema()) + .to_string(); + let table_name = TableName { + catalog_name: catalog, + schema_name: schema, + table_name: reference.table().to_string(), + }; + Ok(table_name) + }) + .collect::>>()?; + Ok(CreateFlowTaskExpr { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_task.task_name, - source_table_names: todo!(), - sink_table_name: todo!(), + task_name: create_task.task_name.to_string(), + source_table_names, + sink_table_name: Some(sink_table_name), create_if_not_exists: create_task.if_not_exists, or_replace: create_task.or_replace, - expire_when: create_task.expire_when.to_string(), - comment: create_task.comment, + // TODO: change this field to optional in proto + expire_when: create_task + .expire_when + .map(|e| e.to_string()) + .unwrap_or_default(), + // TODO: change this field to optional in proto + comment: create_task.comment.unwrap_or_default(), sql: create_task.query.to_string(), task_options: HashMap::new(), }) diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 784219575a95..bbcf5e18274f 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -165,7 +165,7 @@ impl StatementExecutor { Ok(Output::new_with_affected_rows(0)) } Statement::CreateFlowTask(stmt) => { - let _ = self.create_flow_task(stmt, query_ctx).await?; + self.create_flow_task(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index e2c60499d372..4f8f04789ce5 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -330,7 +330,11 @@ impl StatementExecutor { ) -> Result<()> { // TODO: do some verification - todo!() + let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + + // TODO: invoke procedure + + Ok(()) } #[tracing::instrument(skip_all)] diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index f2d8a9aaab83..b81b5c7337a7 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt::{Display, Formatter}; -use sqlparser::ast::{Expr, ObjectName, SqlOption, Value}; +use sqlparser::ast::{Expr, ObjectName, Query, SetExpr, SqlOption, TableFactor, Value}; use crate::error::{InvalidTableOptionValueSnafu, Result}; @@ -50,3 +51,45 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, String)> { let k = key.value.to_lowercase(); Ok((k, v)) } + +/// Walk through a [Query] and extract all the tables referenced in it. +pub fn extract_tables_from_query(query: &Query) -> impl Iterator { + let mut names = HashSet::new(); + + extract_tables_from_set_expr(&query.body, &mut names); + + names.into_iter() +} + +/// Helper function for [extract_tables_from_query]. +/// +/// Handle [SetExpr]. +fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet) { + match set_expr { + SetExpr::Select(select) => { + for from in &select.from { + table_factor_to_object_name(&from.relation, names); + for join in &from.joins { + table_factor_to_object_name(&join.relation, names); + } + } + } + SetExpr::Query(query) => { + extract_tables_from_set_expr(&query.body, names); + } + SetExpr::SetOperation { left, right, .. } => { + extract_tables_from_set_expr(left, names); + extract_tables_from_set_expr(right, names); + } + SetExpr::Values(_) | SetExpr::Insert(_) | SetExpr::Update(_) | SetExpr::Table(_) => {} + }; +} + +/// Helper function for [extract_tables_from_query]. +/// +/// Handle [TableFactor]. +fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet) { + if let TableFactor::Table { name, .. } = table_factor { + names.insert(name.to_owned()); + } +} From e3da5a305da08118bb7a2082769db3419dea4bd2 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 16:30:17 +0800 Subject: [PATCH 05/13] rename output to sink Signed-off-by: Ruihang Xia --- src/frontend/src/instance.rs | 2 +- src/operator/src/expr_factory.rs | 4 ++-- src/sql/src/parsers/create_parser.rs | 4 ++-- src/sql/src/statements/create.rs | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1470f53e95d9..23a48bd7f768 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -496,7 +496,7 @@ pub fn check_permission( } Statement::CreateFlowTask(stmt) => { // TODO: should also validate source table name here? - validate_param(&stmt.output_table_name, query_ctx)?; + validate_param(&stmt.sink_table_name, query_ctx)?; } Statement::Alter(stmt) => { validate_param(stmt.table_name(), query_ctx)?; diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 7246de8351dc..cecc300ee596 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -496,9 +496,9 @@ pub fn to_create_flow_task_expr( ) -> Result { // retrieve sink table name let sink_table_ref = - object_name_to_table_reference(create_task.output_table_name.clone().into(), true) + object_name_to_table_reference(create_task.sink_table_name.clone().into(), true) .with_context(|_| ConvertIdentifierSnafu { - ident: create_task.output_table_name.to_string(), + ident: create_task.sink_table_name.to_string(), })?; let catalog = sink_table_ref .catalog() diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 47814a63e4a3..179e1be5340d 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -228,7 +228,7 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateFlowTask(CreateFlowTask { task_name, - output_table_name, + sink_table_name: output_table_name, or_replace, if_not_exists, expire_when, @@ -1038,7 +1038,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; value: "task_1".to_string(), quote_style: None, }]), - output_table_name: ObjectName(vec![ + sink_table_name: ObjectName(vec![ Ident { value: "schema_1".to_string(), quote_style: None, diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 1bf9b062cb8d..d111a6e7851e 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -242,7 +242,7 @@ pub struct CreateFlowTask { /// Task name pub task_name: ObjectName, /// Output (sink) table name - pub output_table_name: ObjectName, + pub sink_table_name: ObjectName, /// Whether to replace existing task pub or_replace: bool, /// Create if not exist @@ -266,7 +266,7 @@ impl Display for CreateFlowTask { write!(f, "IF NOT EXISTS ")?; } write!(f, "{} ", &self.task_name)?; - write!(f, "OUTPUT AS {} ", &self.output_table_name)?; + write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; if let Some(expire_when) = &self.expire_when { write!(f, "EXPIRE WHEN {} ", expire_when)?; } From 56858cc9ca261777b138ab9431e553696b4b194b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 17:00:10 +0800 Subject: [PATCH 06/13] fix parser Signed-off-by: Ruihang Xia --- src/sql/src/parsers/create_parser.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 179e1be5340d..3f840fb4b58f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -41,7 +41,7 @@ use crate::util::parse_option_string; pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; -pub const TASK: &str = "TASK"; +pub const FLOW: &str = "FLOW"; pub const SINK: &str = "SINK"; pub const EXPIRE: &str = "EXPIRE"; pub const WHEN: &str = "WHEN"; @@ -67,7 +67,7 @@ impl<'a> ParserContext<'a> { Keyword::NoKeyword => { let uppercase = w.value.to_uppercase(); match uppercase.as_str() { - TASK => self.parse_create_flow_task(true), + FLOW => self.parse_create_flow_task(true), _ => self.unsupported(w.to_string()), } } @@ -78,9 +78,10 @@ impl<'a> ParserContext<'a> { } Keyword::NoKeyword => { + let _ = self.parser.next_token(); let uppercase = w.value.to_uppercase(); match uppercase.as_str() { - TASK => self.parse_create_flow_task(false), + FLOW => self.parse_create_flow_task(false), _ => self.unsupported(w.to_string()), } } @@ -177,7 +178,7 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateTable(create_table)) } - /// "CREATE TASK" clause + /// "CREATE FLOW TASK" clause fn parse_create_flow_task(&mut self, or_replace: bool) -> Result { let if_not_exists = self.parser @@ -185,6 +186,8 @@ impl<'a> ParserContext<'a> { let task_name = self.intern_parse_table_name()?; + println!("{:?}", task_name); + self.parser .expect_token(&Token::make_keyword(SINK)) .context(SyntaxSnafu)?; @@ -1018,7 +1021,7 @@ mod tests { #[test] fn test_parse_create_flow_task() { let sql = r" -CREATE OR REPLACE TASK IF NOT EXISTS task_1 +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 SINK TO schema_1.table_1 EXPIRE WHEN timestamp < now() - INTERVAL '5m' COMMENT 'test comment' @@ -1086,9 +1089,9 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; }; assert_eq!(create_task, &expected); - // create task without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE WHEN` and `COMMENT` + // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE WHEN` and `COMMENT` let sql = r" -CREATE TASK task_1 +CREATE FLOW task_2 SINK TO schema_1.table_1 AS SELECT max(c1), min(c2) FROM schema_2.table_2;"; From db3c275197cfa77d7caeeb59688b65662eb3d24f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 17:34:23 +0800 Subject: [PATCH 07/13] remove debug code Signed-off-by: Ruihang Xia --- src/sql/src/parsers/create_parser.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 3f840fb4b58f..8b1327fe6985 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -186,8 +186,6 @@ impl<'a> ParserContext<'a> { let task_name = self.intern_parse_table_name()?; - println!("{:?}", task_name); - self.parser .expect_token(&Token::make_keyword(SINK)) .context(SyntaxSnafu)?; From 2a1d318baec6fabb1beaf4283970c1346ac0ea54 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Apr 2024 19:16:38 +0800 Subject: [PATCH 08/13] upload lock file Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 6a209e795c8b..730c072fd8b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1900,6 +1900,7 @@ dependencies = [ "common-telemetry", "common-time", "common-wal", + "datafusion-common", "datatypes", "derive_builder 0.12.0", "etcd-client", From 0da533201693e09b9e11c0c5c164d63d4ea0dacd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 28 Apr 2024 18:39:41 +0800 Subject: [PATCH 09/13] rename symbol Signed-off-by: Ruihang Xia --- src/frontend/src/instance.rs | 2 +- src/operator/src/expr_factory.rs | 4 ++-- src/operator/src/statement.rs | 2 +- src/operator/src/statement/ddl.rs | 4 ++-- src/sql/src/parsers/create_parser.rs | 10 +++++----- src/sql/src/statements/create.rs | 4 ++-- src/sql/src/statements/statement.rs | 6 +++--- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 23a48bd7f768..9909a8e0023a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -494,7 +494,7 @@ pub fn check_permission( Statement::CreateExternalTable(stmt) => { validate_param(&stmt.name, query_ctx)?; } - Statement::CreateFlowTask(stmt) => { + Statement::CreateFlow(stmt) => { // TODO: should also validate source table name here? validate_param(&stmt.sink_table_name, query_ctx)?; } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index cecc300ee596..2fc41a2d8ff8 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -36,7 +36,7 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; -use sql::statements::create::{CreateExternalTable, CreateFlowTask, CreateTable, TIME_INDEX}; +use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX}; use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; use sql::util::extract_tables_from_query; use table::requests::{TableOptions, FILE_TABLE_META_KEY}; @@ -491,7 +491,7 @@ pub(crate) fn to_alter_expr( } pub fn to_create_flow_task_expr( - create_task: CreateFlowTask, + create_task: CreateFlow, query_ctx: QueryContextRef, ) -> Result { // retrieve sink table name diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index bbcf5e18274f..543d529c7a2c 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -164,7 +164,7 @@ impl StatementExecutor { let _ = self.create_external_table(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } - Statement::CreateFlowTask(stmt) => { + Statement::CreateFlow(stmt) => { self.create_flow_task(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 4f8f04789ce5..83235e48d433 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -46,7 +46,7 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; use sql::statements::create::{ - CreateExternalTable, CreateFlowTask, CreateTable, CreateTableLike, Partitions, + CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, }; use sql::statements::sql_value_to_value; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; @@ -325,7 +325,7 @@ impl StatementExecutor { #[tracing::instrument(skip_all)] pub async fn create_flow_task( &self, - stmt: CreateFlowTask, + stmt: CreateFlow, query_ctx: QueryContextRef, ) -> Result<()> { // TODO: do some verification diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 8b1327fe6985..1c44d4a13dc3 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -32,7 +32,7 @@ use crate::error::{ }; use crate::parser::ParserContext; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateFlowTask, CreateTable, CreateTableLike, Partitions, + CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, TIME_INDEX, }; use crate::statements::statement::Statement; @@ -227,7 +227,7 @@ impl<'a> ParserContext<'a> { let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?); - Ok(Statement::CreateFlowTask(CreateFlowTask { + Ok(Statement::CreateFlow(CreateFlow { task_name, sink_table_name: output_table_name, or_replace, @@ -1030,11 +1030,11 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; .unwrap(); assert_eq!(1, stmts.len()); let create_task = match &stmts[0] { - Statement::CreateFlowTask(c) => c, + Statement::CreateFlow(c) => c, _ => unreachable!(), }; - let expected = CreateFlowTask { + let expected = CreateFlow { task_name: ObjectName(vec![Ident { value: "task_1".to_string(), quote_style: None, @@ -1098,7 +1098,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; .unwrap(); assert_eq!(1, stmts.len()); let create_task = match &stmts[0] { - Statement::CreateFlowTask(c) => c, + Statement::CreateFlow(c) => c, _ => unreachable!(), }; assert!(!create_task.or_replace); diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index d111a6e7851e..1186e419b21f 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -238,7 +238,7 @@ impl Display for CreateTableLike { } #[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] -pub struct CreateFlowTask { +pub struct CreateFlow { /// Task name pub task_name: ObjectName, /// Output (sink) table name @@ -255,7 +255,7 @@ pub struct CreateFlowTask { pub query: Box, } -impl Display for CreateFlowTask { +impl Display for CreateFlow { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CREATE ")?; if self.or_replace { diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 047b2d6f5b8a..55624c180491 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -18,7 +18,7 @@ use datafusion_sql::parser::Statement as DfStatement; use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; -use super::create::CreateFlowTask; +use super::create::CreateFlow; use super::drop::DropDatabase; use super::show::ShowVariables; use crate::error::{ConvertToDfStatementSnafu, Error}; @@ -56,7 +56,7 @@ pub enum Statement { // CREATE TABLE ... LIKE CreateTableLike(CreateTableLike), // CREATE TASK - CreateFlowTask(CreateFlowTask), + CreateFlow(CreateFlow), // DROP TABLE DropTable(DropTable), // DROP DATABASE @@ -103,7 +103,7 @@ impl Display for Statement { Statement::CreateTable(s) => s.fmt(f), Statement::CreateExternalTable(s) => s.fmt(f), Statement::CreateTableLike(s) => s.fmt(f), - Statement::CreateFlowTask(s) => s.fmt(f), + Statement::CreateFlow(s) => s.fmt(f), Statement::DropTable(s) => s.fmt(f), Statement::DropDatabase(s) => s.fmt(f), Statement::CreateDatabase(s) => s.fmt(f), From 3157bd655cc87bcfa33e24050404abf2a18daf4b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 28 Apr 2024 20:05:17 +0800 Subject: [PATCH 10/13] Apply suggestions from code review Co-authored-by: Jeremyhi --- src/operator/src/expr_factory.rs | 4 ++-- src/operator/src/statement/ddl.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 2fc41a2d8ff8..554e716f05e0 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -544,12 +544,12 @@ pub fn to_create_flow_task_expr( sink_table_name: Some(sink_table_name), create_if_not_exists: create_task.if_not_exists, or_replace: create_task.or_replace, - // TODO: change this field to optional in proto + // TODO(ruihang): change this field to optional in proto expire_when: create_task .expire_when .map(|e| e.to_string()) .unwrap_or_default(), - // TODO: change this field to optional in proto + // TODO(ruihang): change this field to optional in proto comment: create_task.comment.unwrap_or_default(), sql: create_task.query.to_string(), task_options: HashMap::new(), diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 83235e48d433..687a290a6dba 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -328,7 +328,7 @@ impl StatementExecutor { stmt: CreateFlow, query_ctx: QueryContextRef, ) -> Result<()> { - // TODO: do some verification + // TODO(ruihang): do some verification let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; From d601753eb74f209c61f6e3732f7562cae3e7e552 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 28 Apr 2024 20:07:02 +0800 Subject: [PATCH 11/13] remove other task word Signed-off-by: Ruihang Xia --- src/api/src/helper.rs | 2 +- src/operator/src/expr_factory.rs | 20 ++++++++++---------- src/operator/src/statement.rs | 2 +- src/operator/src/statement/ddl.rs | 6 +----- src/sql/src/parsers/create_parser.rs | 10 +++++----- 5 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index ec43253f39d8..3f74d2ccc77c 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -518,7 +518,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", - Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task", + Some(Expr::CreateFlowTask(_)) => "ddl.create_flow", Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", None => "ddl.empty", } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 554e716f05e0..57295accff36 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -491,14 +491,14 @@ pub(crate) fn to_alter_expr( } pub fn to_create_flow_task_expr( - create_task: CreateFlow, + create_flow: CreateFlow, query_ctx: QueryContextRef, ) -> Result { // retrieve sink table name let sink_table_ref = - object_name_to_table_reference(create_task.sink_table_name.clone().into(), true) + object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) .with_context(|_| ConvertIdentifierSnafu { - ident: create_task.sink_table_name.to_string(), + ident: create_flow.sink_table_name.to_string(), })?; let catalog = sink_table_ref .catalog() @@ -514,7 +514,7 @@ pub fn to_create_flow_task_expr( table_name: sink_table_ref.table().to_string(), }; - let source_table_names = extract_tables_from_query(&create_task.query) + let source_table_names = extract_tables_from_query(&create_flow.query) .map(|name| { let reference = object_name_to_table_reference(name.clone().into(), true) .with_context(|_| ConvertIdentifierSnafu { @@ -539,19 +539,19 @@ pub fn to_create_flow_task_expr( Ok(CreateFlowTaskExpr { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_task.task_name.to_string(), + task_name: create_flow.task_name.to_string(), source_table_names, sink_table_name: Some(sink_table_name), - create_if_not_exists: create_task.if_not_exists, - or_replace: create_task.or_replace, + create_if_not_exists: create_flow.if_not_exists, + or_replace: create_flow.or_replace, // TODO(ruihang): change this field to optional in proto - expire_when: create_task + expire_when: create_flow .expire_when .map(|e| e.to_string()) .unwrap_or_default(), // TODO(ruihang): change this field to optional in proto - comment: create_task.comment.unwrap_or_default(), - sql: create_task.query.to_string(), + comment: create_flow.comment.unwrap_or_default(), + sql: create_flow.query.to_string(), task_options: HashMap::new(), }) } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 543d529c7a2c..031b1ab28e7d 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -165,7 +165,7 @@ impl StatementExecutor { Ok(Output::new_with_affected_rows(0)) } Statement::CreateFlow(stmt) => { - self.create_flow_task(stmt, query_ctx).await?; + self.create_flow(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 687a290a6dba..a6b13a91722f 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -323,11 +323,7 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn create_flow_task( - &self, - stmt: CreateFlow, - query_ctx: QueryContextRef, - ) -> Result<()> { + pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> { // TODO(ruihang): do some verification let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index c3c11c79ddda..098bdb9ce194 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -67,7 +67,7 @@ impl<'a> ParserContext<'a> { Keyword::NoKeyword => { let uppercase = w.value.to_uppercase(); match uppercase.as_str() { - FLOW => self.parse_create_flow_task(true), + FLOW => self.parse_create_flow(true), _ => self.unsupported(w.to_string()), } } @@ -81,7 +81,7 @@ impl<'a> ParserContext<'a> { let _ = self.parser.next_token(); let uppercase = w.value.to_uppercase(); match uppercase.as_str() { - FLOW => self.parse_create_flow_task(false), + FLOW => self.parse_create_flow(false), _ => self.unsupported(w.to_string()), } } @@ -171,8 +171,8 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateTable(create_table)) } - /// "CREATE FLOW TASK" clause - fn parse_create_flow_task(&mut self, or_replace: bool) -> Result { + /// "CREATE FLOW" clause + fn parse_create_flow(&mut self, or_replace: bool) -> Result { let if_not_exists = self.parse_if_not_exist()?; let task_name = self.intern_parse_table_name()?; @@ -1037,7 +1037,7 @@ mod tests { } #[test] - fn test_parse_create_flow_task() { + fn test_parse_create_flow() { let sql = r" CREATE OR REPLACE FLOW IF NOT EXISTS task_1 SINK TO schema_1.table_1 From 90cbcdc22ac3f9143963d40f9eec731a9d91df32 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 28 Apr 2024 20:10:22 +0800 Subject: [PATCH 12/13] task name to flow name Signed-off-by: Ruihang Xia --- src/common/meta/src/rpc/ddl.rs | 6 +++--- src/operator/src/expr_factory.rs | 2 +- src/sql/src/parsers/create_parser.rs | 4 ++-- src/sql/src/statements/create.rs | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index a7e14161ecc6..3a88ea11bf2e 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -726,7 +726,7 @@ impl TryFrom for PbDropDatabaseTask { /// Create flow task pub struct CreateFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -758,7 +758,7 @@ impl TryFrom for CreateFlowTask { Ok(CreateFlowTask { catalog_name, - task_name, + flow_name: task_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -779,7 +779,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - task_name, + flow_name: task_name, source_table_names, sink_table_name, or_replace, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 57295accff36..855a6a7ffbc4 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -539,7 +539,7 @@ pub fn to_create_flow_task_expr( Ok(CreateFlowTaskExpr { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_flow.task_name.to_string(), + task_name: create_flow.flow_name.to_string(), source_table_names, sink_table_name: Some(sink_table_name), create_if_not_exists: create_flow.if_not_exists, diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 098bdb9ce194..7430b5679fcb 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -219,7 +219,7 @@ impl<'a> ParserContext<'a> { let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?); Ok(Statement::CreateFlow(CreateFlow { - task_name, + flow_name: task_name, sink_table_name: output_table_name, or_replace, if_not_exists, @@ -1055,7 +1055,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; }; let expected = CreateFlow { - task_name: ObjectName(vec![Ident { + flow_name: ObjectName(vec![Ident { value: "task_1".to_string(), quote_style: None, }]), diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 1186e419b21f..505824b5e42c 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -239,8 +239,8 @@ impl Display for CreateTableLike { #[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] pub struct CreateFlow { - /// Task name - pub task_name: ObjectName, + /// Flow name + pub flow_name: ObjectName, /// Output (sink) table name pub sink_table_name: ObjectName, /// Whether to replace existing task @@ -265,7 +265,7 @@ impl Display for CreateFlow { if self.if_not_exists { write!(f, "IF NOT EXISTS ")?; } - write!(f, "{} ", &self.task_name)?; + write!(f, "{} ", &self.flow_name)?; write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; if let Some(expire_when) = &self.expire_when { write!(f, "EXPIRE WHEN {} ", expire_when)?; From ab704cef7d818cc5506c580070ffd7190c178280 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 28 Apr 2024 20:12:07 +0800 Subject: [PATCH 13/13] one more comment Signed-off-by: Ruihang Xia --- src/sql/src/statements/statement.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 55624c180491..a3dcab916705 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -55,7 +55,7 @@ pub enum Statement { CreateExternalTable(CreateExternalTable), // CREATE TABLE ... LIKE CreateTableLike(CreateTableLike), - // CREATE TASK + // CREATE FLOW CreateFlow(CreateFlow), // DROP TABLE DropTable(DropTable),