diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 587e7ecc87e9..096c3de247ea 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -514,8 +514,10 @@ pub fn check_permission( // These are executed by query engine, and will be checked there. Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {} // database ops won't be checked - Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::DropDatabase(_) => { - } + Statement::CreateDatabase(_) + | Statement::ShowDatabases(_) + | Statement::DropDatabase(_) + | Statement::DropFlow(_) => {} Statement::ShowCreateTable(stmt) => { validate_param(&stmt.table_name, query_ctx)?; diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index b8fcc4111e42..4dadffd1240d 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -168,6 +168,10 @@ impl StatementExecutor { self.create_flow(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } + Statement::DropFlow(_stmt) => { + // TODO(weny): implement it. + unimplemented!() + } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { let (catalog, schema, table) = diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index e873343bbdcb..2b90c792dc11 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -25,6 +25,8 @@ use crate::parsers::tql_parser; use crate::statements::statement::Statement; use crate::statements::transform_statements; +pub const FLOW: &str = "FLOW"; + /// SQL Parser options. #[derive(Clone, Debug, Default)] pub struct ParseOptions {} diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 4876931eea56..ac9853c6f2eb 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -30,7 +30,7 @@ use crate::error::{ self, InvalidColumnOptionSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu, }; -use crate::parser::ParserContext; +use crate::parser::{ParserContext, FLOW}; use crate::statements::create::{ CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, TIME_INDEX, @@ -41,7 +41,6 @@ use crate::util::parse_option_string; pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; -pub const FLOW: &str = "FLOW"; pub const SINK: &str = "SINK"; pub const EXPIRE: &str = "EXPIRE"; pub const WHEN: &str = "WHEN"; diff --git a/src/sql/src/parsers/drop_parser.rs b/src/sql/src/parsers/drop_parser.rs index 8cc62189d88d..6730e3887e29 100644 --- a/src/sql/src/parsers/drop_parser.rs +++ b/src/sql/src/parsers/drop_parser.rs @@ -17,8 +17,8 @@ use sqlparser::dialect::keywords::Keyword; use sqlparser::tokenizer::Token; use crate::error::{self, InvalidTableNameSnafu, Result}; -use crate::parser::ParserContext; -use crate::statements::drop::{DropDatabase, DropTable}; +use crate::parser::{ParserContext, FLOW}; +use crate::statements::drop::{DropDatabase, DropFlow, DropTable}; use crate::statements::statement::Statement; /// DROP statement parser implementation @@ -29,12 +29,41 @@ impl<'a> ParserContext<'a> { Token::Word(w) => match w.keyword { Keyword::TABLE => self.parse_drop_table(), Keyword::SCHEMA | Keyword::DATABASE => self.parse_drop_database(), + Keyword::NoKeyword => { + let uppercase = w.value.to_uppercase(); + match uppercase.as_str() { + FLOW => self.parse_drop_flow(), + _ => self.unsupported(w.to_string()), + } + } _ => self.unsupported(w.to_string()), }, unexpected => self.unsupported(unexpected.to_string()), } } + fn parse_drop_flow(&mut self) -> Result { + let _ = self.parser.next_token(); + + let if_exists = self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]); + let raw_flow_ident = self + .parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "a flow name", + actual: self.peek_token_as_string(), + })?; + let flow_ident = Self::canonicalize_object_name(raw_flow_ident); + ensure!( + !flow_ident.0.is_empty(), + InvalidTableNameSnafu { + name: flow_ident.to_string() + } + ); + + Ok(Statement::DropFlow(DropFlow::new(flow_ident, if_exists))) + } + fn parse_drop_table(&mut self) -> Result { let _ = self.parser.next_token(); @@ -172,4 +201,53 @@ mod tests { )) ); } + + #[test] + pub fn test_drop_flow() { + let sql = "DROP FLOW foo"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let mut stmts: Vec = result.unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::DropFlow(DropFlow::new(ObjectName(vec![Ident::new("foo")]), false)) + ); + + let sql = "DROP FLOW IF EXISTS foo"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let mut stmts = result.unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::DropFlow(DropFlow::new(ObjectName(vec![Ident::new("foo")]), true)) + ); + + let sql = "DROP FLOW my_schema.foo"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let mut stmts = result.unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::DropFlow(DropFlow::new( + ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]), + false + )) + ); + + let sql = "DROP FLOW my_catalog.my_schema.foo"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let mut stmts = result.unwrap(); + assert_eq!( + stmts.pop().unwrap(), + Statement::DropFlow(DropFlow::new( + ObjectName(vec![ + Ident::new("my_catalog"), + Ident::new("my_schema"), + Ident::new("foo") + ]), + false + )) + ) + } } diff --git a/src/sql/src/statements/drop.rs b/src/sql/src/statements/drop.rs index 4725f512816d..304e52ee523b 100644 --- a/src/sql/src/statements/drop.rs +++ b/src/sql/src/statements/drop.rs @@ -91,6 +91,45 @@ impl Display for DropDatabase { } } +/// DROP FLOW statement. +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub struct DropFlow { + flow_name: ObjectName, + /// drop flow if exists + drop_if_exists: bool, +} + +impl DropFlow { + /// Creates a statement for `DROP DATABASE` + pub fn new(flow_name: ObjectName, if_exists: bool) -> Self { + Self { + flow_name, + drop_if_exists: if_exists, + } + } + + /// Returns the flow name. + pub fn flow_name(&self) -> &ObjectName { + &self.flow_name + } + + /// Return the `drop_if_exists`. + pub fn drop_if_exists(&self) -> bool { + self.drop_if_exists + } +} + +impl Display for DropFlow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DROP FLOW")?; + if self.drop_if_exists() { + f.write_str(" IF EXISTS")?; + } + let flow_name = self.flow_name(); + write!(f, r#" {flow_name}"#) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -188,4 +227,49 @@ DROP TABLE IF EXISTS test"#, } } } + + #[test] + fn test_display_drop_flow() { + let sql = r"drop flow test;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::DropFlow { .. }); + + match &stmts[0] { + Statement::DropFlow(set) => { + let new_sql = format!("\n{}", set); + assert_eq!( + r#" +DROP FLOW test"#, + &new_sql + ); + } + _ => { + unreachable!(); + } + } + + let sql = r"drop flow if exists test;"; + let stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::DropFlow { .. }); + + match &stmts[0] { + Statement::DropFlow(set) => { + let new_sql = format!("\n{}", set); + assert_eq!( + r#" +DROP FLOW IF EXISTS test"#, + &new_sql + ); + } + _ => { + unreachable!(); + } + } + } } diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index a3dcab916705..ef05db0f5352 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -18,23 +18,20 @@ use datafusion_sql::parser::Statement as DfStatement; use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; -use super::create::CreateFlow; -use super::drop::DropDatabase; -use super::show::ShowVariables; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::alter::AlterTable; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, + CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, }; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; -use crate::statements::drop::DropTable; +use crate::statements::drop::{DropDatabase, DropFlow, DropTable}; use crate::statements::explain::Explain; use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::set_variables::SetVariables; use crate::statements::show::{ - ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, + ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables, }; use crate::statements::tql::Tql; use crate::statements::truncate::TruncateTable; @@ -57,6 +54,8 @@ pub enum Statement { CreateTableLike(CreateTableLike), // CREATE FLOW CreateFlow(CreateFlow), + // DROP FLOW + DropFlow(DropFlow), // DROP TABLE DropTable(DropTable), // DROP DATABASE @@ -104,6 +103,7 @@ impl Display for Statement { Statement::CreateExternalTable(s) => s.fmt(f), Statement::CreateTableLike(s) => s.fmt(f), Statement::CreateFlow(s) => s.fmt(f), + Statement::DropFlow(s) => s.fmt(f), Statement::DropTable(s) => s.fmt(f), Statement::DropDatabase(s) => s.fmt(f), Statement::CreateDatabase(s) => s.fmt(f),