diff --git a/Cargo.lock b/Cargo.lock index 38581fd57101..8c75ab94659b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,6 +1915,7 @@ dependencies = [ "common-telemetry", "common-time", "common-wal", + "datafusion-common", "datatypes", "derive_builder 0.12.0", "etcd-client", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index ec43253f39d8..3f74d2ccc77c 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -518,7 +518,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", - Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task", + Some(Expr::CreateFlowTask(_)) => "ddl.create_flow", Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", None => "ddl.empty", } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 9132f6de559b..40df9d6180c0 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -28,6 +28,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +datafusion-common.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index a7e14161ecc6..3a88ea11bf2e 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -726,7 +726,7 @@ impl TryFrom for PbDropDatabaseTask { /// Create flow task pub struct CreateFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -758,7 +758,7 @@ impl TryFrom for CreateFlowTask { Ok(CreateFlowTask { catalog_name, - task_name, + flow_name: task_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -779,7 +779,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - task_name, + flow_name: task_name, source_table_names, sink_table_name, or_replace, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a4d7bc0936bc..587e7ecc87e9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -523,6 +523,10 @@ pub fn check_permission( Statement::CreateExternalTable(stmt) => { validate_param(&stmt.name, query_ctx)?; } + Statement::CreateFlow(stmt) => { + // TODO: should also validate source table name here? + validate_param(&stmt.sink_table_name, query_ctx)?; + } Statement::Alter(stmt) => { validate_param(stmt.table_name(), query_ctx)?; } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index dd8d64f10ffa..f15b5fbcae2f 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -82,6 +82,14 @@ pub enum Error { source: sql::error::Error, }, + #[snafu(display("Failed to convert identifier: {}", ident))] + ConvertIdentifier { + ident: String, + location: Location, + #[snafu(source)] + error: datafusion::error::DataFusionError, + }, + #[snafu(display("Failed to convert value to sql value: {}", value))] ConvertSqlValue { value: Value, @@ -568,7 +576,8 @@ impl ErrorExt for Error { | Error::InferFileTableSchema { .. } | Error::SchemaIncompatible { .. } | Error::UnsupportedRegionRequest { .. } - | Error::InvalidTableName { .. } => StatusCode::InvalidArguments, + | Error::InvalidTableName { .. } + | Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 0a84c4a7315f..855a6a7ffbc4 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,11 +18,13 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, - CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, + CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, + TableName, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; use common_time::Timezone; +use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; use file_engine::FileOptions; use query::sql::{ @@ -34,16 +36,17 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; -use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX}; +use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX}; use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; +use sql::util::extract_tables_from_query; use table::requests::{TableOptions, FILE_TABLE_META_KEY}; use table::table_reference::TableReference; use crate::error::{ BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, - EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, - InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, - SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, + ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, + InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, + PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, }; #[derive(Debug, Copy, Clone)] @@ -487,6 +490,72 @@ pub(crate) fn to_alter_expr( }) } +pub fn to_create_flow_task_expr( + create_flow: CreateFlow, + query_ctx: QueryContextRef, +) -> Result { + // retrieve sink table name + let sink_table_ref = + object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) + .with_context(|_| ConvertIdentifierSnafu { + ident: create_flow.sink_table_name.to_string(), + })?; + let catalog = sink_table_ref + .catalog() + .unwrap_or(query_ctx.current_catalog()) + .to_string(); + let schema = sink_table_ref + .schema() + .unwrap_or(query_ctx.current_schema()) + .to_string(); + let sink_table_name = TableName { + catalog_name: catalog, + schema_name: schema, + table_name: sink_table_ref.table().to_string(), + }; + + let source_table_names = extract_tables_from_query(&create_flow.query) + .map(|name| { + let reference = object_name_to_table_reference(name.clone().into(), true) + .with_context(|_| ConvertIdentifierSnafu { + ident: name.to_string(), + })?; + let catalog = reference + .catalog() + .unwrap_or(query_ctx.current_catalog()) + .to_string(); + let schema = reference + .schema() + .unwrap_or(query_ctx.current_schema()) + .to_string(); + let table_name = TableName { + catalog_name: catalog, + schema_name: schema, + table_name: reference.table().to_string(), + }; + Ok(table_name) + }) + .collect::>>()?; + + Ok(CreateFlowTaskExpr { + catalog_name: query_ctx.current_catalog().to_string(), + task_name: create_flow.flow_name.to_string(), + source_table_names, + sink_table_name: Some(sink_table_name), + create_if_not_exists: create_flow.if_not_exists, + or_replace: create_flow.or_replace, + // TODO(ruihang): change this field to optional in proto + expire_when: create_flow + .expire_when + .map(|e| e.to_string()) + .unwrap_or_default(), + // TODO(ruihang): change this field to optional in proto + comment: create_flow.comment.unwrap_or_default(), + sql: create_flow.query.to_string(), + task_options: HashMap::new(), + }) +} + #[cfg(test)] mod tests { use datatypes::value::Value; diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 2ecb35e91319..031b1ab28e7d 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -164,6 +164,10 @@ impl StatementExecutor { let _ = self.create_external_table(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } + Statement::CreateFlow(stmt) => { + self.create_flow(stmt, query_ctx).await?; + Ok(Output::new_with_affected_rows(0)) + } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { let (catalog, schema, table) = diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2ab87fa4513e..a6b13a91722f 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -45,7 +45,9 @@ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; -use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; +use sql::statements::create::{ + CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, +}; use sql::statements::sql_value_to_value; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; @@ -320,6 +322,17 @@ impl StatementExecutor { .collect()) } + #[tracing::instrument(skip_all)] + pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> { + // TODO(ruihang): do some verification + + let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + + // TODO: invoke procedure + + Ok(()) + } + #[tracing::instrument(skip_all)] pub async fn alter_logical_tables(&self, alter_table_exprs: Vec) -> Result { let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index f6a967581fde..7430b5679fcb 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -32,7 +32,8 @@ use crate::error::{ }; use crate::parser::ParserContext; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, Partitions, TIME_INDEX, + CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, + TIME_INDEX, }; use crate::statements::statement::Statement; use crate::statements::{get_data_type_by_alias_name, OptionMap}; @@ -40,6 +41,10 @@ use crate::util::parse_option_string; pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; +pub const FLOW: &str = "FLOW"; +pub const SINK: &str = "SINK"; +pub const EXPIRE: &str = "EXPIRE"; +pub const WHEN: &str = "WHEN"; /// Parses create [table] statement impl<'a> ParserContext<'a> { @@ -52,6 +57,35 @@ impl<'a> ParserContext<'a> { Keyword::EXTERNAL => self.parse_create_external_table(), + Keyword::OR => { + let _ = self.parser.next_token(); + self.parser + .expect_keyword(Keyword::REPLACE) + .context(SyntaxSnafu)?; + match self.parser.next_token().token { + Token::Word(w) => match w.keyword { + Keyword::NoKeyword => { + let uppercase = w.value.to_uppercase(); + match uppercase.as_str() { + FLOW => self.parse_create_flow(true), + _ => self.unsupported(w.to_string()), + } + } + _ => self.unsupported(w.to_string()), + }, + _ => self.unsupported(w.to_string()), + } + } + + Keyword::NoKeyword => { + let _ = self.parser.next_token(); + let uppercase = w.value.to_uppercase(); + match uppercase.as_str() { + FLOW => self.parse_create_flow(false), + _ => self.unsupported(w.to_string()), + } + } + _ => self.unsupported(w.to_string()), }, unexpected => self.unsupported(unexpected.to_string()), @@ -137,6 +171,64 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateTable(create_table)) } + /// "CREATE FLOW" clause + fn parse_create_flow(&mut self, or_replace: bool) -> Result { + let if_not_exists = self.parse_if_not_exist()?; + + let task_name = self.intern_parse_table_name()?; + + self.parser + .expect_token(&Token::make_keyword(SINK)) + .context(SyntaxSnafu)?; + self.parser + .expect_keyword(Keyword::TO) + .context(SyntaxSnafu)?; + + let output_table_name = self.intern_parse_table_name()?; + + let expire_when = if self + .parser + .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(WHEN)]) + { + Some(self.parser.parse_expr().context(error::SyntaxSnafu)?) + } else { + None + }; + + let comment = if self.parser.parse_keyword(Keyword::COMMENT) { + match self.parser.next_token() { + TokenWithLocation { + token: Token::SingleQuotedString(value, ..), + .. + } => Some(value), + unexpected => { + return self + .parser + .expected("string", unexpected) + .context(SyntaxSnafu) + } + } + } else { + None + }; + + self.parser + .expect_keyword(Keyword::AS) + .context(SyntaxSnafu)?; + + let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?); + + Ok(Statement::CreateFlow(CreateFlow { + flow_name: task_name, + sink_table_name: output_table_name, + or_replace, + if_not_exists, + expire_when, + comment, + query, + })) + } + fn parse_if_not_exist(&mut self) -> Result { match self.parser.peek_token().token { Token::Word(w) if Keyword::IF != w.keyword => return Ok(false), @@ -185,8 +277,7 @@ impl<'a> ParserContext<'a> { Ok(options.into()) } - /// "PARTITION BY ..." syntax: - // TODO(ruihang): docs + /// "PARTITION BY ..." clause fn parse_partitions(&mut self) -> Result> { if !self.parser.parse_keyword(Keyword::PARTITION) { return Ok(None); @@ -737,7 +828,7 @@ mod tests { use common_catalog::consts::FILE_ENGINE; use common_error::ext::ErrorExt; use sqlparser::ast::ColumnOption::NotNull; - use sqlparser::ast::{BinaryOperator, ObjectName, Value}; + use sqlparser::ast::{BinaryOperator, Expr, Function, Interval, ObjectName, Value}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -945,6 +1036,97 @@ mod tests { ); } + #[test] + fn test_parse_create_flow() { + let sql = r" +CREATE OR REPLACE FLOW IF NOT EXISTS task_1 +SINK TO schema_1.table_1 +EXPIRE WHEN timestamp < now() - INTERVAL '5m' +COMMENT 'test comment' +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + let create_task = match &stmts[0] { + Statement::CreateFlow(c) => c, + _ => unreachable!(), + }; + + let expected = CreateFlow { + flow_name: ObjectName(vec![Ident { + value: "task_1".to_string(), + quote_style: None, + }]), + sink_table_name: ObjectName(vec![ + Ident { + value: "schema_1".to_string(), + quote_style: None, + }, + Ident { + value: "table_1".to_string(), + quote_style: None, + }, + ]), + or_replace: true, + if_not_exists: true, + expire_when: Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Ident { + value: "timestamp".to_string(), + quote_style: None, + })), + op: BinaryOperator::Lt, + right: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Function(Function { + name: ObjectName(vec![Ident { + value: "now".to_string(), + quote_style: None, + }]), + args: vec![], + filter: None, + null_treatment: None, + over: None, + distinct: false, + special: false, + order_by: vec![], + })), + op: BinaryOperator::Minus, + right: Box::new(Expr::Interval(Interval { + value: Box::new(Expr::Value(Value::SingleQuotedString("5m".to_string()))), + leading_field: None, + leading_precision: None, + last_field: None, + fractional_seconds_precision: None, + })), + }), + }), + comment: Some("test comment".to_string()), + // ignore query parse result + query: create_task.query.clone(), + }; + assert_eq!(create_task, &expected); + + // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE WHEN` and `COMMENT` + let sql = r" +CREATE FLOW task_2 +SINK TO schema_1.table_1 +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + let create_task = match &stmts[0] { + Statement::CreateFlow(c) => c, + _ => unreachable!(), + }; + assert!(!create_task.or_replace); + assert!(!create_task.if_not_exists); + assert!(create_task.expire_when.is_none()); + assert!(create_task.comment.is_none()); + } + #[test] fn test_validate_create() { let sql = r" diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 92f62b09c072..505824b5e42c 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -16,7 +16,7 @@ use std::fmt::{Display, Formatter}; use common_catalog::consts::FILE_ENGINE; use itertools::Itertools; -use sqlparser::ast::Expr; +use sqlparser::ast::{Expr, Query}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, TableConstraint, Value as SqlValue}; @@ -237,6 +237,46 @@ impl Display for CreateTableLike { } } +#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] +pub struct CreateFlow { + /// Flow name + pub flow_name: ObjectName, + /// Output (sink) table name + pub sink_table_name: ObjectName, + /// Whether to replace existing task + pub or_replace: bool, + /// Create if not exist + pub if_not_exists: bool, + /// `EXPIRE_WHEN` + pub expire_when: Option, + /// Comment string + pub comment: Option, + /// SQL statement + pub query: Box, +} + +impl Display for CreateFlow { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CREATE ")?; + if self.or_replace { + write!(f, "OR REPLACE ")?; + } + write!(f, "TASK ")?; + if self.if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } + write!(f, "{} ", &self.flow_name)?; + write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; + if let Some(expire_when) = &self.expire_when { + write!(f, "EXPIRE WHEN {} ", expire_when)?; + } + if let Some(comment) = &self.comment { + write!(f, "COMMENT '{}' ", comment)?; + } + write!(f, "AS {}", &self.query) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 70566893d65a..a3dcab916705 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -18,6 +18,7 @@ use datafusion_sql::parser::Statement as DfStatement; use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; +use super::create::CreateFlow; use super::drop::DropDatabase; use super::show::ShowVariables; use crate::error::{ConvertToDfStatementSnafu, Error}; @@ -54,6 +55,8 @@ pub enum Statement { CreateExternalTable(CreateExternalTable), // CREATE TABLE ... LIKE CreateTableLike(CreateTableLike), + // CREATE FLOW + CreateFlow(CreateFlow), // DROP TABLE DropTable(DropTable), // DROP DATABASE @@ -100,6 +103,7 @@ impl Display for Statement { Statement::CreateTable(s) => s.fmt(f), Statement::CreateExternalTable(s) => s.fmt(f), Statement::CreateTableLike(s) => s.fmt(f), + Statement::CreateFlow(s) => s.fmt(f), Statement::DropTable(s) => s.fmt(f), Statement::DropDatabase(s) => s.fmt(f), Statement::CreateDatabase(s) => s.fmt(f), diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index f2d8a9aaab83..b81b5c7337a7 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt::{Display, Formatter}; -use sqlparser::ast::{Expr, ObjectName, SqlOption, Value}; +use sqlparser::ast::{Expr, ObjectName, Query, SetExpr, SqlOption, TableFactor, Value}; use crate::error::{InvalidTableOptionValueSnafu, Result}; @@ -50,3 +51,45 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, String)> { let k = key.value.to_lowercase(); Ok((k, v)) } + +/// Walk through a [Query] and extract all the tables referenced in it. +pub fn extract_tables_from_query(query: &Query) -> impl Iterator { + let mut names = HashSet::new(); + + extract_tables_from_set_expr(&query.body, &mut names); + + names.into_iter() +} + +/// Helper function for [extract_tables_from_query]. +/// +/// Handle [SetExpr]. +fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet) { + match set_expr { + SetExpr::Select(select) => { + for from in &select.from { + table_factor_to_object_name(&from.relation, names); + for join in &from.joins { + table_factor_to_object_name(&join.relation, names); + } + } + } + SetExpr::Query(query) => { + extract_tables_from_set_expr(&query.body, names); + } + SetExpr::SetOperation { left, right, .. } => { + extract_tables_from_set_expr(left, names); + extract_tables_from_set_expr(right, names); + } + SetExpr::Values(_) | SetExpr::Insert(_) | SetExpr::Update(_) | SetExpr::Table(_) => {} + }; +} + +/// Helper function for [extract_tables_from_query]. +/// +/// Handle [TableFactor]. +fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet) { + if let TableFactor::Table { name, .. } = table_factor { + names.insert(name.to_owned()); + } +}