From a70e09a8ba35a551e1d9863bd9989d9047f5657d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 21 May 2024 16:29:35 +0800 Subject: [PATCH 1/6] feat: change EXPIRE WHEN to EXPIRE AFTER Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../meta/src/cache/flow/table_flownode.rs | 2 +- src/common/meta/src/ddl/create_flow.rs | 6 +-- src/common/meta/src/ddl/tests/create_flow.rs | 2 +- src/common/meta/src/key/flow.rs | 4 +- src/common/meta/src/key/flow/flow_info.rs | 2 +- src/common/meta/src/rpc/ddl.rs | 10 ++-- src/flow/src/adapter/flownode_impl.rs | 4 +- src/operator/src/expr_factory.rs | 4 +- src/sql/src/parsers/create_parser.rs | 51 +++++-------------- src/sql/src/statements/create.rs | 6 +-- 12 files changed, 36 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3572c04287aa..72de312c1fd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4232,7 +4232,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11db14b8502f55ca5348917fd18e6fcf140f55e#a11db14b8502f55ca5348917fd18e6fcf140f55e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=48ddbd96d0d9beadaa50e2dfe2066b01e80a268c#48ddbd96d0d9beadaa50e2dfe2066b01e80a268c" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 52cb6b1306db..134ccc8ac22e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "48ddbd96d0d9beadaa50e2dfe2066b01e80a268c" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index a15578a1b8f9..19ddc0364924 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -180,7 +180,7 @@ mod tests { catalog_name: DEFAULT_CATALOG_NAME.to_string(), flow_name: "my_flow".to_string(), raw_sql: "sql".to_string(), - expire_when: "expire".to_string(), + expire_after: "expire".to_string(), comment: "comment".to_string(), options: Default::default(), }, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index a7fb59b05623..487ac130b3b6 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -283,7 +283,7 @@ impl From<&CreateFlowData> for CreateRequest { sink_table_name: Some(value.task.sink_table_name.clone().into()), // Always be true create_if_not_exists: true, - expire_when: value.task.expire_when.clone(), + expire_after: value.task.expire_after.clone(), comment: value.task.comment.clone(), sql: value.task.sql.clone(), flow_options: value.task.flow_options.clone(), @@ -297,7 +297,7 @@ impl From<&CreateFlowData> for FlowInfoValue { catalog_name, flow_name, sink_table_name, - expire_when, + expire_after, comment, sql, flow_options: options, @@ -318,7 +318,7 @@ impl From<&CreateFlowData> for FlowInfoValue { catalog_name, flow_name, raw_sql: sql, - expire_when, + expire_after, comment, options, } diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 415fc12f62a5..030e40171ca0 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -44,7 +44,7 @@ pub(crate) fn test_create_flow_task( sink_table_name, or_replace: false, create_if_not_exists, - expire_when: "".to_string(), + expire_after: "".to_string(), comment: "".to_string(), sql: "raw_sql".to_string(), flow_options: Default::default(), diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 1682922ab7ce..f94778ac444d 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -328,7 +328,7 @@ mod tests { sink_table_name, flownode_ids, raw_sql: "raw".to_string(), - expire_when: "expr".to_string(), + expire_after: "expr".to_string(), comment: "hi".to_string(), options: Default::default(), } @@ -420,7 +420,7 @@ mod tests { sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), - expire_when: "expr".to_string(), + expire_after: "expr".to_string(), comment: "hi".to_string(), options: Default::default(), }; diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 0a2be4dea1a2..0284be19a3b6 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -123,7 +123,7 @@ pub struct FlowInfoValue { /// The raw sql. pub(crate) raw_sql: String, /// The expr of expire. - pub(crate) expire_when: String, + pub(crate) expire_after: String, /// The comment. pub(crate) comment: String, /// The options. diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index e42639c381a7..0ed61036110a 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -898,7 +898,7 @@ pub struct CreateFlowTask { pub sink_table_name: TableName, pub or_replace: bool, pub create_if_not_exists: bool, - pub expire_when: String, + pub expire_after: String, pub comment: String, pub sql: String, pub flow_options: HashMap, @@ -915,7 +915,7 @@ impl TryFrom for CreateFlowTask { sink_table_name, or_replace, create_if_not_exists, - expire_when, + expire_after, comment, sql, flow_options, @@ -934,7 +934,7 @@ impl TryFrom for CreateFlowTask { .into(), or_replace, create_if_not_exists, - expire_when, + expire_after, comment, sql, flow_options, @@ -951,7 +951,7 @@ impl From for PbCreateFlowTask { sink_table_name, or_replace, create_if_not_exists, - expire_when, + expire_after, comment, sql, flow_options, @@ -965,7 +965,7 @@ impl From for PbCreateFlowTask { sink_table_name: Some(sink_table_name.into()), or_replace, create_if_not_exists, - expire_when, + expire_after, comment, sql, flow_options, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 057d8f932ed3..c6e1f8fb1b15 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -45,7 +45,7 @@ impl Flownode for FlownodeManager { source_table_ids, sink_table_name: Some(sink_table_name), create_if_not_exists, - expire_when, + expire_after, comment, sql, flow_options, @@ -62,7 +62,7 @@ impl Flownode for FlownodeManager { sink_table_name, &source_table_ids, create_if_not_exists, - Some(expire_when), + Some(expire_after), Some(comment), sql, flow_options, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index b34bedc7c90f..b509eb3f47c2 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -591,8 +591,8 @@ pub fn to_create_flow_task_expr( sink_table_name: Some(sink_table_name), or_replace: create_flow.or_replace, create_if_not_exists: create_flow.if_not_exists, - expire_when: create_flow - .expire_when + expire_after: create_flow + .expire_after .map(|e| e.to_string()) .unwrap_or_default(), comment: create_flow.comment.unwrap_or_default(), diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 152e6a81f7ab..043017194b2f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -44,7 +44,7 @@ pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; pub const SINK: &str = "SINK"; pub const EXPIRE: &str = "EXPIRE"; -pub const WHEN: &str = "WHEN"; +pub const AFTER: &str = "AFTER"; const DB_OPT_KEY_TTL: &str = "ttl"; @@ -235,9 +235,9 @@ impl<'a> ParserContext<'a> { let output_table_name = self.intern_parse_table_name()?; - let expire_when = if self + let expire_after = if self .parser - .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(WHEN)]) + .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)]) { Some(self.parser.parse_expr().context(error::SyntaxSnafu)?) } else { @@ -272,7 +272,7 @@ impl<'a> ParserContext<'a> { sink_table_name: output_table_name, or_replace, if_not_exists, - expire_when, + expire_after, comment, query, })) @@ -877,7 +877,7 @@ mod tests { use common_catalog::consts::FILE_ENGINE; use common_error::ext::ErrorExt; use sqlparser::ast::ColumnOption::NotNull; - use sqlparser::ast::{BinaryOperator, Expr, Function, Interval, ObjectName, Value}; + use sqlparser::ast::{BinaryOperator, Expr, Interval, ObjectName, Value}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -1103,7 +1103,7 @@ mod tests { let sql = r" CREATE OR REPLACE FLOW IF NOT EXISTS task_1 SINK TO schema_1.table_1 -EXPIRE WHEN timestamp < now() - INTERVAL '5m' +EXPIRE AFTER INTERVAL '5m' COMMENT 'test comment' AS SELECT max(c1), min(c2) FROM schema_2.table_2;"; @@ -1133,36 +1133,13 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; ]), or_replace: true, if_not_exists: true, - expire_when: Some(Expr::BinaryOp { - left: Box::new(Expr::Identifier(Ident { - value: "timestamp".to_string(), - quote_style: None, - })), - op: BinaryOperator::Lt, - right: Box::new(Expr::BinaryOp { - left: Box::new(Expr::Function(Function { - name: ObjectName(vec![Ident { - value: "now".to_string(), - quote_style: None, - }]), - args: vec![], - filter: None, - null_treatment: None, - over: None, - distinct: false, - special: false, - order_by: vec![], - })), - op: BinaryOperator::Minus, - right: Box::new(Expr::Interval(Interval { - value: Box::new(Expr::Value(Value::SingleQuotedString("5m".to_string()))), - leading_field: None, - leading_precision: None, - last_field: None, - fractional_seconds_precision: None, - })), - }), - }), + expire_after: Some(Expr::Interval(Interval { + value: Box::new(Expr::Value(Value::SingleQuotedString("5m".to_string()))), + leading_field: None, + leading_precision: None, + last_field: None, + fractional_seconds_precision: None, + })), comment: Some("test comment".to_string()), // ignore query parse result query: create_task.query.clone(), @@ -1185,7 +1162,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; }; assert!(!create_task.or_replace); assert!(!create_task.if_not_exists); - assert!(create_task.expire_when.is_none()); + assert!(create_task.expire_after.is_none()); assert!(create_task.comment.is_none()); } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index b4903a666c6d..0ae0c5ab773c 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -256,7 +256,7 @@ pub struct CreateFlow { /// Create if not exist pub if_not_exists: bool, /// `EXPIRE_WHEN` - pub expire_when: Option, + pub expire_after: Option, /// Comment string pub comment: Option, /// SQL statement @@ -275,8 +275,8 @@ impl Display for CreateFlow { } write!(f, "{} ", &self.flow_name)?; write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; - if let Some(expire_when) = &self.expire_when { - write!(f, "EXPIRE WHEN {} ", expire_when)?; + if let Some(expire_when) = &self.expire_after { + write!(f, "EXPIRE AFTER {} ", expire_when)?; } if let Some(comment) = &self.comment { write!(f, "COMMENT '{}' ", comment)?; From aa7a831033459b3e43000551e3ed8d04fa3f167f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 21 May 2024 20:25:38 +0800 Subject: [PATCH 2/6] change remaining Signed-off-by: Ruihang Xia --- src/flow/src/adapter.rs | 8 ++++---- src/flow/src/adapter/worker.rs | 12 ++++++------ src/sql/src/statements/create.rs | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f75288831041..81be9ef79f07 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -565,7 +565,7 @@ impl FlownodeManager { /// Return task id if a new task is created, otherwise return None /// /// steps to create task: - /// 1. parse query into typed plan(and optional parse expire_when expr) + /// 1. parse query into typed plan(and optional parse expire_after expr) /// 2. render source/sink with output table id and used input table id #[allow(clippy::too_many_arguments)] pub async fn create_flow( @@ -574,7 +574,7 @@ impl FlownodeManager { sink_table_name: TableName, source_table_ids: &[TableId], create_if_not_exist: bool, - expire_when: Option, + expire_after: Option, comment: Option, sql: String, flow_options: HashMap, @@ -608,7 +608,7 @@ impl FlownodeManager { debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; - let expire_when = expire_when + let expire_after = expire_after .and_then(|s| { if s.is_empty() || s.split_whitespace().join("").is_empty() { None @@ -656,7 +656,7 @@ impl FlownodeManager { sink_sender, source_ids, src_recvs: source_receivers, - expire_when, + expire_after, create_if_not_exist, err_collector, }; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 659c6fedf8ae..c8866470a0be 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -232,7 +232,7 @@ impl<'s> Worker<'s> { source_ids: &[GlobalId], src_recvs: Vec>, // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead - expire_when: Option, + expire_after: Option, create_if_not_exist: bool, err_collector: ErrCollector, ) -> Result, Error> { @@ -247,7 +247,7 @@ impl<'s> Worker<'s> { err_collector, ..Default::default() }; - cur_task_state.state.set_expire_after(expire_when); + cur_task_state.state.set_expire_after(expire_after); { let mut ctx = cur_task_state.new_ctx(sink_id); @@ -319,7 +319,7 @@ impl<'s> Worker<'s> { sink_sender, source_ids, src_recvs, - expire_when, + expire_after, create_if_not_exist, err_collector, } => { @@ -330,7 +330,7 @@ impl<'s> Worker<'s> { sink_sender, &source_ids, src_recvs, - expire_when, + expire_after, create_if_not_exist, err_collector, ); @@ -368,7 +368,7 @@ pub enum Request { sink_sender: mpsc::UnboundedSender, source_ids: Vec, src_recvs: Vec>, - expire_when: Option, + expire_after: Option, create_if_not_exist: bool, err_collector: ErrCollector, }, @@ -524,7 +524,7 @@ mod test { sink_sender: sink_tx, source_ids: src_ids, src_recvs: vec![rx], - expire_when: None, + expire_after: None, create_if_not_exist: true, err_collector: ErrCollector::default(), }; diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 0ae0c5ab773c..6a459ef1cd8d 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -255,7 +255,7 @@ pub struct CreateFlow { pub or_replace: bool, /// Create if not exist pub if_not_exists: bool, - /// `EXPIRE_WHEN` + /// `EXPIRE AFTER` pub expire_after: Option, /// Comment string pub comment: Option, @@ -275,8 +275,8 @@ impl Display for CreateFlow { } write!(f, "{} ", &self.flow_name)?; write!(f, "OUTPUT AS {} ", &self.sink_table_name)?; - if let Some(expire_when) = &self.expire_after { - write!(f, "EXPIRE AFTER {} ", expire_when)?; + if let Some(expire_after) = &self.expire_after { + write!(f, "EXPIRE AFTER {} ", expire_after)?; } if let Some(comment) = &self.comment { write!(f, "COMMENT '{}' ", comment)?; From fb4257a75d0b2dcbfe70dc6baf89661c98379e52 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 22 May 2024 11:40:11 +0800 Subject: [PATCH 3/6] rename create_if_not_exist to create_if_not_exists Signed-off-by: Ruihang Xia --- src/flow/src/adapter.rs | 6 +++--- src/flow/src/adapter/worker.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 81be9ef79f07..b87d726bd0a1 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -573,14 +573,14 @@ impl FlownodeManager { flow_id: FlowId, sink_table_name: TableName, source_table_ids: &[TableId], - create_if_not_exist: bool, + create_if_not_exists: bool, expire_after: Option, comment: Option, sql: String, flow_options: HashMap, query_ctx: Option, ) -> Result, Error> { - if create_if_not_exist { + if create_if_not_exists { // check if the task already exists for handle in self.worker_handles.iter() { if handle.lock().await.contains_flow(flow_id).await? { @@ -657,7 +657,7 @@ impl FlownodeManager { source_ids, src_recvs: source_receivers, expire_after, - create_if_not_exist, + create_if_not_exists, err_collector, }; handle.create_flow(create_request).await?; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index c8866470a0be..ea5ecea507e2 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -233,11 +233,11 @@ impl<'s> Worker<'s> { src_recvs: Vec>, // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead expire_after: Option, - create_if_not_exist: bool, + create_if_not_exists: bool, err_collector: ErrCollector, ) -> Result, Error> { let already_exist = self.task_states.contains_key(&flow_id); - match (already_exist, create_if_not_exist) { + match (already_exist, create_if_not_exists) { (true, true) => return Ok(None), (true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, (false, _) => (), @@ -320,7 +320,7 @@ impl<'s> Worker<'s> { source_ids, src_recvs, expire_after, - create_if_not_exist, + create_if_not_exists, err_collector, } => { let task_create_result = self.create_flow( @@ -331,7 +331,7 @@ impl<'s> Worker<'s> { &source_ids, src_recvs, expire_after, - create_if_not_exist, + create_if_not_exists, err_collector, ); Some(( @@ -369,7 +369,7 @@ pub enum Request { source_ids: Vec, src_recvs: Vec>, expire_after: Option, - create_if_not_exist: bool, + create_if_not_exists: bool, err_collector: ErrCollector, }, Remove { @@ -525,7 +525,7 @@ mod test { source_ids: src_ids, src_recvs: vec![rx], expire_after: None, - create_if_not_exist: true, + create_if_not_exists: true, err_collector: ErrCollector::default(), }; handle.create_flow(create_reqs).await.unwrap(); From 525fa192b9e99bac9f13c2017bc9a17bfe78fa7c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 25 May 2024 00:26:12 +0800 Subject: [PATCH 4/6] parse interval expr Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../meta/src/cache/flow/table_flownode.rs | 2 +- src/common/meta/src/ddl/create_flow.rs | 3 +- src/common/meta/src/ddl/tests/create_flow.rs | 2 +- src/common/meta/src/key/flow.rs | 4 +- src/common/meta/src/key/flow/flow_info.rs | 3 +- src/common/meta/src/rpc/ddl.rs | 11 +- src/flow/src/adapter.rs | 19 +-- src/flow/src/adapter/flownode_impl.rs | 3 +- src/operator/src/expr_factory.rs | 7 +- src/sql/src/error.rs | 29 +++- src/sql/src/parsers.rs | 1 + src/sql/src/parsers/create_parser.rs | 40 ++++-- src/sql/src/parsers/error.rs | 17 --- src/sql/src/parsers/tql_parser.rs | 130 +++--------------- src/sql/src/parsers/utils.rs | 112 +++++++++++++++ src/sql/src/statements/create.rs | 3 +- 18 files changed, 213 insertions(+), 177 deletions(-) create mode 100644 src/sql/src/parsers/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 72de312c1fd2..b7eba16efeb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4232,7 +4232,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=48ddbd96d0d9beadaa50e2dfe2066b01e80a268c#48ddbd96d0d9beadaa50e2dfe2066b01e80a268c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f5b24680022dd8fe6f17a13356fcc678d8d329ff#f5b24680022dd8fe6f17a13356fcc678d8d329ff" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 134ccc8ac22e..6fe916f87ccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "48ddbd96d0d9beadaa50e2dfe2066b01e80a268c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f5b24680022dd8fe6f17a13356fcc678d8d329ff" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 19ddc0364924..eeaa88128628 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -180,7 +180,7 @@ mod tests { catalog_name: DEFAULT_CATALOG_NAME.to_string(), flow_name: "my_flow".to_string(), raw_sql: "sql".to_string(), - expire_after: "expire".to_string(), + expire_after: Some(300), comment: "comment".to_string(), options: Default::default(), }, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 487ac130b3b6..afa437ed6ca4 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -18,6 +18,7 @@ use std::collections::BTreeMap; use api::v1::flow::flow_request::Body as PbFlowRequest; use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; +use api::v1::ExpireAfter; use async_trait::async_trait; use common_catalog::format_full_flow_name; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; @@ -283,7 +284,7 @@ impl From<&CreateFlowData> for CreateRequest { sink_table_name: Some(value.task.sink_table_name.clone().into()), // Always be true create_if_not_exists: true, - expire_after: value.task.expire_after.clone(), + expire_after: value.task.expire_after.map(|value| ExpireAfter { value }), comment: value.task.comment.clone(), sql: value.task.sql.clone(), flow_options: value.task.flow_options.clone(), diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 030e40171ca0..e79fe27b848f 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -44,7 +44,7 @@ pub(crate) fn test_create_flow_task( sink_table_name, or_replace: false, create_if_not_exists, - expire_after: "".to_string(), + expire_after: Some(300), comment: "".to_string(), sql: "raw_sql".to_string(), flow_options: Default::default(), diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index f94778ac444d..1f8db5585433 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -328,7 +328,7 @@ mod tests { sink_table_name, flownode_ids, raw_sql: "raw".to_string(), - expire_after: "expr".to_string(), + expire_after: Some(300), comment: "hi".to_string(), options: Default::default(), } @@ -420,7 +420,7 @@ mod tests { sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), - expire_after: "expr".to_string(), + expire_after: Some(300), comment: "hi".to_string(), options: Default::default(), }; diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 0284be19a3b6..8a95441e223f 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -123,7 +123,8 @@ pub struct FlowInfoValue { /// The raw sql. pub(crate) raw_sql: String, /// The expr of expire. - pub(crate) expire_after: String, + /// Duration in second as `i64`. + pub(crate) expire_after: Option, /// The comment. pub(crate) comment: String, /// The options. diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 0ed61036110a..8e977f1ca5bc 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -28,8 +28,8 @@ use api::v1::meta::{ }; use api::v1::{ AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, - DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, QueryContext as PbQueryContext, - TruncateTableExpr, + DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter, + QueryContext as PbQueryContext, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -898,7 +898,8 @@ pub struct CreateFlowTask { pub sink_table_name: TableName, pub or_replace: bool, pub create_if_not_exists: bool, - pub expire_after: String, + /// Duration in seconds. Data older than this duration will not be used. + pub expire_after: Option, pub comment: String, pub sql: String, pub flow_options: HashMap, @@ -934,7 +935,7 @@ impl TryFrom for CreateFlowTask { .into(), or_replace, create_if_not_exists, - expire_after, + expire_after: expire_after.map(|e| e.value), comment, sql, flow_options, @@ -965,7 +966,7 @@ impl From for PbCreateFlowTask { sink_table_name: Some(sink_table_name.into()), or_replace, create_if_not_exists, - expire_after, + expire_after: expire_after.map(|value| ExpireAfter { value }), comment, sql, flow_options, diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index b87d726bd0a1..25bb3cb2bf2c 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -42,7 +42,6 @@ use tokio::sync::{oneshot, watch, Mutex, RwLock}; use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; pub(crate) use crate::adapter::node_context::FlownodeContext; -use crate::adapter::parse_expr::parse_fixed; use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; @@ -574,7 +573,7 @@ impl FlownodeManager { sink_table_name: TableName, source_table_ids: &[TableId], create_if_not_exists: bool, - expire_after: Option, + expire_after: Option, comment: Option, sql: String, flow_options: HashMap, @@ -608,22 +607,6 @@ impl FlownodeManager { debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; - let expire_after = expire_after - .and_then(|s| { - if s.is_empty() || s.split_whitespace().join("").is_empty() { - None - } else { - Some(s) - } - }) - .map(|d| { - let d = d.as_ref(); - parse_fixed(d) - .map(|(_, n)| n) - .map_err(|err| err.to_string()) - }) - .transpose() - .map_err(|err| UnexpectedSnafu { reason: err }.build())?; let _ = comment; let _ = flow_options; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index c6e1f8fb1b15..e770bb5e4cf1 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -56,13 +56,14 @@ impl Flownode for FlownodeManager { sink_table_name.schema_name, sink_table_name.table_name, ]; + let expire_after = expire_after.map(|e| e.value); let ret = self .create_flow( task_id.id as u64, sink_table_name, &source_table_ids, create_if_not_exists, - Some(expire_after), + expire_after, Some(comment), sql, flow_options, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index b509eb3f47c2..6715a06e0363 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -19,7 +19,7 @@ use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn, - DropColumns, RenameTable, SemanticType, TableName, + DropColumns, ExpireAfter, RenameTable, SemanticType, TableName, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -591,10 +591,7 @@ pub fn to_create_flow_task_expr( sink_table_name: Some(sink_table_name), or_replace: create_flow.or_replace, create_if_not_exists: create_flow.if_not_exists, - expire_after: create_flow - .expire_after - .map(|e| e.to_string()) - .unwrap_or_default(), + expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), flow_options: HashMap::new(), diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 81a760f31787..d63f36411ff6 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -19,6 +19,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_time::timestamp::TimeUnit; use common_time::Timestamp; +use datafusion_common::DataFusionError; use datatypes::prelude::{ConcreteDataType, Value}; use snafu::{Location, Snafu}; use sqlparser::ast::Ident; @@ -123,6 +124,13 @@ pub enum Error { #[snafu(display("Invalid database name: {}", name))] InvalidDatabaseName { name: String }, + #[snafu(display("Invalid interval provided: {}", reason))] + InvalidInterval { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unrecognized database option key: {}", key))] InvalidDatabaseOption { key: String, @@ -213,6 +221,22 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert to logical TQL expression"))] + ConvertToLogicalExpression { + #[snafu(source)] + error: DataFusionError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to simplify TQL expression"))] + Simplification { + #[snafu(source)] + error: DataFusionError, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -241,7 +265,10 @@ impl ErrorExt for Error { | InvalidSqlValue { .. } | TimestampOverflow { .. } | InvalidTableOption { .. } - | InvalidCast { .. } => StatusCode::InvalidArguments, + | InvalidCast { .. } + | ConvertToLogicalExpression { .. } + | Simplification { .. } + | InvalidInterval { .. } => StatusCode::InvalidArguments, SerializeColumnDefaultConstraint { source, .. } => source.status_code(), ConvertToGrpcDataType { source, .. } => source.status_code(), diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index b7e5c8c44e84..721f41367784 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -26,3 +26,4 @@ pub(crate) mod set_var_parser; pub(crate) mod show_parser; pub(crate) mod tql_parser; pub(crate) mod truncate_parser; +pub(crate) mod utils; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 043017194b2f..5eb158364d8f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -15,6 +15,8 @@ use std::collections::HashMap; use common_catalog::consts::default_engine; +use datafusion_common::ScalarValue; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit}; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr}; @@ -25,11 +27,12 @@ use sqlparser::parser::{Parser, ParserError}; use sqlparser::tokenizer::{Token, TokenWithLocation, Word}; use table::requests::validate_table_option; +use super::utils; use crate::ast::{ColumnDef, Ident, TableConstraint}; use crate::error::{ - self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidTableOptionSnafu, - InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu, UnexpectedSnafu, - UnsupportedSnafu, + self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu, + InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu, + UnexpectedSnafu, UnsupportedSnafu, }; use crate::parser::{ParserContext, FLOW}; use crate::statements::create::{ @@ -239,7 +242,24 @@ impl<'a> ParserContext<'a> { .parser .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)]) { - Some(self.parser.parse_expr().context(error::SyntaxSnafu)?) + let expire_after_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?; + let expire_after_lit = utils::parser_expr_to_scalar_value(expire_after_expr.clone())? + .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano)) + .ok() + .with_context(|| InvalidIntervalSnafu { + reason: format!("cannot cast {} to interval type", expire_after_expr), + })?; + if let ScalarValue::IntervalMonthDayNano(Some(nanoseconds)) = expire_after_lit { + Some( + i64::try_from(nanoseconds / 1_000_000_000) + .ok() + .with_context(|| InvalidIntervalSnafu { + reason: format!("interval {} overflows", nanoseconds), + })?, + ) + } else { + unreachable!() + } } else { None }; @@ -877,7 +897,7 @@ mod tests { use common_catalog::consts::FILE_ENGINE; use common_error::ext::ErrorExt; use sqlparser::ast::ColumnOption::NotNull; - use sqlparser::ast::{BinaryOperator, Expr, Interval, ObjectName, Value}; + use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value}; use super::*; use crate::dialect::GreptimeDbDialect; @@ -1103,7 +1123,7 @@ mod tests { let sql = r" CREATE OR REPLACE FLOW IF NOT EXISTS task_1 SINK TO schema_1.table_1 -EXPIRE AFTER INTERVAL '5m' +EXPIRE AFTER INTERVAL '5 minutes' COMMENT 'test comment' AS SELECT max(c1), min(c2) FROM schema_2.table_2;"; @@ -1133,13 +1153,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; ]), or_replace: true, if_not_exists: true, - expire_after: Some(Expr::Interval(Interval { - value: Box::new(Expr::Value(Value::SingleQuotedString("5m".to_string()))), - leading_field: None, - leading_precision: None, - last_field: None, - fractional_seconds_precision: None, - })), + expire_after: Some(300), comment: Some("test comment".to_string()), // ignore query parse result query: create_task.query.clone(), diff --git a/src/sql/src/parsers/error.rs b/src/sql/src/parsers/error.rs index 988aa27f5493..8feb80b988e3 100644 --- a/src/sql/src/parsers/error.rs +++ b/src/sql/src/parsers/error.rs @@ -13,7 +13,6 @@ // limitations under the License. use common_macro::stack_trace_debug; -use datafusion_common::DataFusionError; use snafu::{Location, Snafu}; use sqlparser::parser::ParserError; @@ -30,22 +29,6 @@ pub enum TQLError { location: Location, }, - #[snafu(display("Failed to convert to logical TQL expression"))] - ConvertToLogicalExpression { - #[snafu(source)] - error: DataFusionError, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to simplify TQL expression"))] - Simplification { - #[snafu(source)] - error: DataFusionError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to evaluate TQL expression: {}", msg))] Evaluation { msg: String }, } diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs index 13a754a9ca81..985f66720bed 100644 --- a/src/sql/src/parsers/tql_parser.rs +++ b/src/sql/src/parsers/tql_parser.rs @@ -12,16 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use chrono::Utc; -use datafusion::execution::context::SessionState; -use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; -use datafusion_common::config::ConfigOptions; -use datafusion_common::{DFSchema, Result as DFResult, ScalarValue, TableReference}; -use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource, WindowUDF}; -use datafusion_physical_expr::execution_props::ExecutionProps; -use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use datafusion_common::ScalarValue; use snafu::{OptionExt, ResultExt}; use sqlparser::keywords::Keyword; use sqlparser::parser::ParserError; @@ -29,6 +20,7 @@ use sqlparser::tokenizer::Token; use crate::error::{self, Result}; use crate::parser::ParserContext; +use crate::parsers::utils; use crate::statements::statement::Statement; use crate::statements::tql::{Tql, TqlAnalyze, TqlEval, TqlExplain, TqlParameters}; @@ -37,13 +29,10 @@ const EVAL: &str = "EVAL"; const EVALUATE: &str = "EVALUATE"; const VERBOSE: &str = "VERBOSE"; -use datatypes::arrow::datatypes::DataType; use sqlparser::parser::Parser; use crate::dialect::GreptimeDbDialect; -use crate::parsers::error::{ - ConvertToLogicalExpressionSnafu, EvaluationSnafu, ParserSnafu, SimplificationSnafu, TQLError, -}; +use crate::parsers::error::{EvaluationSnafu, ParserSnafu, TQLError}; /// TQL extension parser, including: /// - `TQL EVAL ` @@ -192,10 +181,25 @@ impl<'a> ParserContext<'a> { } fn parse_tokens(tokens: Vec) -> std::result::Result { - Self::parse_to_expr(tokens) - .and_then(Self::parse_to_logical_expr) - .and_then(Self::simplify_expr) - .and_then(Self::evaluate_expr) + let parser_expr = Self::parse_to_expr(tokens)?; + let lit = utils::parser_expr_to_scalar_value(parser_expr).unwrap(); + + let second = match lit { + ScalarValue::TimestampNanosecond(ts_nanos, _) + | ScalarValue::DurationNanosecond(ts_nanos) => ts_nanos.map(|v| v / 1_000_000_000), + ScalarValue::TimestampMicrosecond(ts_micros, _) + | ScalarValue::DurationMicrosecond(ts_micros) => ts_micros.map(|v| v / 1_000_000), + ScalarValue::TimestampMillisecond(ts_millis, _) + | ScalarValue::DurationMillisecond(ts_millis) => ts_millis.map(|v| v / 1_000), + ScalarValue::TimestampSecond(ts_secs, _) | ScalarValue::DurationSecond(ts_secs) => { + ts_secs + } + _ => None, + }; + + second.map(|ts| ts.to_string()).context(EvaluationSnafu { + msg: format!("Failed to extract a timestamp value {lit:?}"), + }) } fn parse_to_expr(tokens: Vec) -> std::result::Result { @@ -205,46 +209,6 @@ impl<'a> ParserContext<'a> { .context(ParserSnafu) } - fn parse_to_logical_expr(expr: sqlparser::ast::Expr) -> std::result::Result { - let empty_df_schema = DFSchema::empty(); - SqlToRel::new(&StubContextProvider::default()) - .sql_to_expr(expr.into(), &empty_df_schema, &mut Default::default()) - .context(ConvertToLogicalExpressionSnafu) - } - - fn simplify_expr(logical_expr: Expr) -> std::result::Result { - let empty_df_schema = DFSchema::empty(); - let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now()); - let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema)); - ExprSimplifier::new(info) - .simplify(logical_expr) - .context(SimplificationSnafu) - } - - fn evaluate_expr(simplified_expr: Expr) -> std::result::Result { - match simplified_expr { - Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _)) - | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => { - ts_nanos.map(|v| v / 1_000_000_000) - } - Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _)) - | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => { - ts_micros.map(|v| v / 1_000_000) - } - Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _)) - | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => { - ts_millis.map(|v| v / 1_000) - } - Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _)) - | Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs, - _ => None, - } - .map(|ts| ts.to_string()) - .context(EvaluationSnafu { - msg: format!("Failed to extract a timestamp value {simplified_expr:?}"), - }) - } - fn parse_tql_query(parser: &mut Parser, sql: &str) -> std::result::Result { while matches!(parser.peek_token().token, Token::Comma) { let _skip_token = parser.next_token(); @@ -264,56 +228,6 @@ impl<'a> ParserContext<'a> { } } -struct StubContextProvider { - state: SessionState, -} - -impl Default for StubContextProvider { - fn default() -> Self { - Self { - state: SessionState::new_with_config_rt(Default::default(), Default::default()), - } - } -} - -impl ContextProvider for StubContextProvider { - fn get_table_source(&self, _name: TableReference) -> DFResult> { - unimplemented!() - } - - fn get_function_meta(&self, name: &str) -> Option> { - self.state.scalar_functions().get(name).cloned() - } - - fn get_aggregate_meta(&self, name: &str) -> Option> { - self.state.aggregate_functions().get(name).cloned() - } - - fn get_window_meta(&self, _name: &str) -> Option> { - unimplemented!() - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - unimplemented!() - } - - fn options(&self) -> &ConfigOptions { - unimplemented!() - } - - fn udfs_names(&self) -> Vec { - self.state.scalar_functions().keys().cloned().collect() - } - - fn udafs_names(&self) -> Vec { - self.state.aggregate_functions().keys().cloned().collect() - } - - fn udwfs_names(&self) -> Vec { - self.state.window_functions().keys().cloned().collect() - } -} - #[cfg(test)] mod tests { use common_error::ext::ErrorExt; diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs new file mode 100644 index 000000000000..97b317dd369a --- /dev/null +++ b/src/sql/src/parsers/utils.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::Utc; +use datafusion::config::ConfigOptions; +use datafusion::error::Result as DfResult; +use datafusion::execution::context::SessionState; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; +use datafusion_common::{DFSchema, ScalarValue}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF}; +use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use datafusion_sql::TableReference; +use datatypes::arrow::datatypes::DataType; +use snafu::ResultExt; + +use crate::error::{ + ConvertToLogicalExpressionSnafu, ParseSqlValueSnafu, Result, SimplificationSnafu, +}; + +/// Convert a parser expression to a scalar value. This function will try the +/// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be +/// handled properly. +pub fn parser_expr_to_scalar_value(expr: sqlparser::ast::Expr) -> Result { + // 1. convert parser expr to logical expr + let empty_df_schema = DFSchema::empty(); + let logical_expr = SqlToRel::new(&StubContextProvider::default()) + .sql_to_expr(expr.into(), &empty_df_schema, &mut Default::default()) + .context(ConvertToLogicalExpressionSnafu)?; + + // 2. simplify logical expr + let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now()); + let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema)); + let simplified_expr = ExprSimplifier::new(info) + .simplify(logical_expr) + .context(SimplificationSnafu)?; + + if let datafusion::logical_expr::Expr::Literal(lit) = simplified_expr { + Ok(lit) + } else { + // Err(ParseSqlValue) + ParseSqlValueSnafu { + msg: format!("expected literal value, but found {:?}", simplified_expr), + } + .fail() + } +} + +/// Helper struct for [`parser_expr_to_scalar_value`]. +struct StubContextProvider { + state: SessionState, +} + +impl Default for StubContextProvider { + fn default() -> Self { + Self { + state: SessionState::new_with_config_rt(Default::default(), Default::default()), + } + } +} + +impl ContextProvider for StubContextProvider { + fn get_table_source(&self, _name: TableReference) -> DfResult> { + unimplemented!() + } + + fn get_function_meta(&self, name: &str) -> Option> { + self.state.scalar_functions().get(name).cloned() + } + + fn get_aggregate_meta(&self, name: &str) -> Option> { + self.state.aggregate_functions().get(name).cloned() + } + + fn get_window_meta(&self, _name: &str) -> Option> { + unimplemented!() + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + unimplemented!() + } + + fn options(&self) -> &ConfigOptions { + unimplemented!() + } + + fn udfs_names(&self) -> Vec { + self.state.scalar_functions().keys().cloned().collect() + } + + fn udafs_names(&self) -> Vec { + self.state.aggregate_functions().keys().cloned().collect() + } + + fn udwfs_names(&self) -> Vec { + self.state.window_functions().keys().cloned().collect() + } +} diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 6a459ef1cd8d..4259d61cc501 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -256,7 +256,8 @@ pub struct CreateFlow { /// Create if not exist pub if_not_exists: bool, /// `EXPIRE AFTER` - pub expire_after: Option, + /// Duration in second as `i64` + pub expire_after: Option, /// Comment string pub comment: Option, /// SQL statement From 36712efeed71841be087475ab9468f63ba915fd5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 May 2024 11:43:23 +0800 Subject: [PATCH 5/6] update comment Signed-off-by: Ruihang Xia --- src/sql/src/parsers/create_parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 5eb158364d8f..8dc3f0c662c0 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -1160,7 +1160,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; }; assert_eq!(create_task, &expected); - // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE WHEN` and `COMMENT` + // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` let sql = r" CREATE FLOW task_2 SINK TO schema_1.table_1 From aaf9a79f2a491e61152cdd8e1b878b98034233e7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 27 May 2024 11:51:06 +0800 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Jeremyhi --- src/common/meta/src/key/flow/flow_info.rs | 2 +- src/flow/src/adapter/worker.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 8a95441e223f..f08e7c5def56 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -123,7 +123,7 @@ pub struct FlowInfoValue { /// The raw sql. pub(crate) raw_sql: String, /// The expr of expire. - /// Duration in second as `i64`. + /// Duration in seconds as `i64`. pub(crate) expire_after: Option, /// The comment. pub(crate) comment: String, diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index ea5ecea507e2..9df68c6e5d1d 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -236,8 +236,8 @@ impl<'s> Worker<'s> { create_if_not_exists: bool, err_collector: ErrCollector, ) -> Result, Error> { - let already_exist = self.task_states.contains_key(&flow_id); - match (already_exist, create_if_not_exists) { + let already_exists = self.task_states.contains_key(&flow_id); + match (already_exists, create_if_not_exists) { (true, true) => return Ok(None), (true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, (false, _) => (),