Skip to content

Commit

Permalink
feat: implement drop flow parser (#3888)
Browse files Browse the repository at this point in the history
* feat: implement drop flow parser

* Update src/sql/src/parsers/drop_parser.rs

Co-authored-by: Ruihang Xia <[email protected]>

* fix: fmt code

---------

Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
WenyXu and waynexia authored May 9, 2024
1 parent f995f60 commit 5140d24
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 12 deletions.
6 changes: 4 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,10 @@ pub fn check_permission(
// These are executed by query engine, and will be checked there.
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {}
// database ops won't be checked
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::DropDatabase(_) => {
}
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::DropFlow(_) => {}

Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
Expand Down
4 changes: 4 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl StatementExecutor {
self.create_flow(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::DropFlow(_stmt) => {
// TODO(weny): implement it.
unimplemented!()
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::parsers::tql_parser;
use crate::statements::statement::Statement;
use crate::statements::transform_statements;

pub const FLOW: &str = "FLOW";

/// SQL Parser options.
#[derive(Clone, Debug, Default)]
pub struct ParseOptions {}
Expand Down
3 changes: 1 addition & 2 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::{
self, InvalidColumnOptionSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu,
MissingTimeIndexSnafu, Result, SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::parser::ParserContext;
use crate::parser::{ParserContext, FLOW};
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
TIME_INDEX,
Expand All @@ -41,7 +41,6 @@ use crate::util::parse_option_string;

pub const ENGINE: &str = "ENGINE";
pub const MAXVALUE: &str = "MAXVALUE";
pub const FLOW: &str = "FLOW";
pub const SINK: &str = "SINK";
pub const EXPIRE: &str = "EXPIRE";
pub const WHEN: &str = "WHEN";
Expand Down
82 changes: 80 additions & 2 deletions src/sql/src/parsers/drop_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use sqlparser::dialect::keywords::Keyword;
use sqlparser::tokenizer::Token;

use crate::error::{self, InvalidTableNameSnafu, Result};
use crate::parser::ParserContext;
use crate::statements::drop::{DropDatabase, DropTable};
use crate::parser::{ParserContext, FLOW};
use crate::statements::drop::{DropDatabase, DropFlow, DropTable};
use crate::statements::statement::Statement;

/// DROP statement parser implementation
Expand All @@ -29,12 +29,41 @@ impl<'a> ParserContext<'a> {
Token::Word(w) => match w.keyword {
Keyword::TABLE => self.parse_drop_table(),
Keyword::SCHEMA | Keyword::DATABASE => self.parse_drop_database(),
Keyword::NoKeyword => {
let uppercase = w.value.to_uppercase();
match uppercase.as_str() {
FLOW => self.parse_drop_flow(),
_ => self.unsupported(w.to_string()),
}
}
_ => self.unsupported(w.to_string()),
},
unexpected => self.unsupported(unexpected.to_string()),
}
}

fn parse_drop_flow(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();

let if_exists = self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
let raw_flow_ident = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow_ident = Self::canonicalize_object_name(raw_flow_ident);
ensure!(
!flow_ident.0.is_empty(),
InvalidTableNameSnafu {
name: flow_ident.to_string()
}
);

Ok(Statement::DropFlow(DropFlow::new(flow_ident, if_exists)))
}

fn parse_drop_table(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();

Expand Down Expand Up @@ -172,4 +201,53 @@ mod tests {
))
);
}

#[test]
pub fn test_drop_flow() {
let sql = "DROP FLOW foo";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let mut stmts: Vec<Statement> = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropFlow(DropFlow::new(ObjectName(vec![Ident::new("foo")]), false))
);

let sql = "DROP FLOW IF EXISTS foo";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropFlow(DropFlow::new(ObjectName(vec![Ident::new("foo")]), true))
);

let sql = "DROP FLOW my_schema.foo";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropFlow(DropFlow::new(
ObjectName(vec![Ident::new("my_schema"), Ident::new("foo")]),
false
))
);

let sql = "DROP FLOW my_catalog.my_schema.foo";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropFlow(DropFlow::new(
ObjectName(vec![
Ident::new("my_catalog"),
Ident::new("my_schema"),
Ident::new("foo")
]),
false
))
)
}
}
84 changes: 84 additions & 0 deletions src/sql/src/statements/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,45 @@ impl Display for DropDatabase {
}
}

/// DROP FLOW statement.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct DropFlow {
flow_name: ObjectName,
/// drop flow if exists
drop_if_exists: bool,
}

impl DropFlow {
/// Creates a statement for `DROP DATABASE`
pub fn new(flow_name: ObjectName, if_exists: bool) -> Self {
Self {
flow_name,
drop_if_exists: if_exists,
}
}

/// Returns the flow name.
pub fn flow_name(&self) -> &ObjectName {
&self.flow_name
}

/// Return the `drop_if_exists`.
pub fn drop_if_exists(&self) -> bool {
self.drop_if_exists
}
}

impl Display for DropFlow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("DROP FLOW")?;
if self.drop_if_exists() {
f.write_str(" IF EXISTS")?;
}
let flow_name = self.flow_name();
write!(f, r#" {flow_name}"#)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
Expand Down Expand Up @@ -188,4 +227,49 @@ DROP TABLE IF EXISTS test"#,
}
}
}

#[test]
fn test_display_drop_flow() {
let sql = r"drop flow test;";
let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::DropFlow { .. });

match &stmts[0] {
Statement::DropFlow(set) => {
let new_sql = format!("\n{}", set);
assert_eq!(
r#"
DROP FLOW test"#,
&new_sql
);
}
_ => {
unreachable!();
}
}

let sql = r"drop flow if exists test;";
let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::DropFlow { .. });

match &stmts[0] {
Statement::DropFlow(set) => {
let new_sql = format!("\n{}", set);
assert_eq!(
r#"
DROP FLOW IF EXISTS test"#,
&new_sql
);
}
_ => {
unreachable!();
}
}
}
}
12 changes: 6 additions & 6 deletions src/sql/src/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@ use datafusion_sql::parser::Statement as DfStatement;
use sqlparser::ast::Statement as SpStatement;
use sqlparser_derive::{Visit, VisitMut};

use super::create::CreateFlow;
use super::drop::DropDatabase;
use super::show::ShowVariables;
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike,
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike,
};
use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable;
use crate::statements::drop::DropTable;
use crate::statements::drop::{DropDatabase, DropFlow, DropTable};
use crate::statements::explain::Explain;
use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
Expand All @@ -57,6 +54,8 @@ pub enum Statement {
CreateTableLike(CreateTableLike),
// CREATE FLOW
CreateFlow(CreateFlow),
// DROP FLOW
DropFlow(DropFlow),
// DROP TABLE
DropTable(DropTable),
// DROP DATABASE
Expand Down Expand Up @@ -104,6 +103,7 @@ impl Display for Statement {
Statement::CreateExternalTable(s) => s.fmt(f),
Statement::CreateTableLike(s) => s.fmt(f),
Statement::CreateFlow(s) => s.fmt(f),
Statement::DropFlow(s) => s.fmt(f),
Statement::DropTable(s) => s.fmt(f),
Statement::DropDatabase(s) => s.fmt(f),
Statement::CreateDatabase(s) => s.fmt(f),
Expand Down

0 comments on commit 5140d24

Please sign in to comment.