Skip to content

Commit

Permalink
fix(flow): minor fix about count(*)&sink keyword (#5061)
Browse files Browse the repository at this point in the history
* fix: SiNk

* fix: sink&count(*)

* tests: SiNk

* refactor: per review
  • Loading branch information
discord9 authored Nov 29, 2024
1 parent 8bdef77 commit d931389
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/flow/src/df_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub async fn apply_df_optimizer(
) -> Result<datafusion_expr::LogicalPlan, Error> {
let cfg = ConfigOptions::new();
let analyzer = Analyzer::with_rules(vec![
Arc::new(CountWildcardRule::new()),
Arc::new(AvgExpandRule::new()),
Arc::new(TumbleExpandRule::new()),
Arc::new(CheckGroupByRule::new()),
Expand Down
14 changes: 11 additions & 3 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,17 @@ impl<'a> ParserContext<'a> {

let flow_name = self.intern_parse_table_name()?;

self.parser
.expect_token(&Token::make_keyword(SINK))
.context(SyntaxSnafu)?;
// make `SINK` case in-sensitive
if let Token::Word(word) = self.parser.peek_token().token
&& word.value.eq_ignore_ascii_case(SINK)
{
self.parser.next_token();
} else {
Err(ParserError::ParserError(
"Expect `SINK` keyword".to_string(),
))
.context(SyntaxSnafu)?
}
self.parser
.expect_keyword(Keyword::TO)
.context(SyntaxSnafu)?;
Expand Down
67 changes: 67 additions & 0 deletions tests/cases/standalone/common/flow/flow_basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,73 @@ DROP TABLE out_num_cnt_basic;

Affected Rows: 0

-- test count(*) rewrite
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

Affected Rows: 0

CREATE FLOW test_wildcard_basic SiNk TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;

Affected Rows: 0

DROP FLOW test_wildcard_basic;

Affected Rows: 0

CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;

Affected Rows: 0

INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");

Affected Rows: 2

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');

+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+

SELECT wildcard FROM out_basic;

+----------+
| wildcard |
+----------+
| 2 |
+----------+

DROP FLOW test_wildcard_basic;

Affected Rows: 0

DROP TABLE out_basic;

Affected Rows: 0

DROP TABLE input_basic;

Affected Rows: 0

-- test distinct
CREATE TABLE distinct_basic (
number INT,
Expand Down
37 changes: 37 additions & 0 deletions tests/cases/standalone/common/flow/flow_basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,43 @@ DROP TABLE numbers_input_basic;

DROP TABLE out_num_cnt_basic;

-- test count(*) rewrite
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);

CREATE FLOW test_wildcard_basic SiNk TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;

DROP FLOW test_wildcard_basic;

CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;

INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');

SELECT wildcard FROM out_basic;

DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
DROP TABLE input_basic;

-- test distinct
CREATE TABLE distinct_basic (
number INT,
Expand Down

0 comments on commit d931389

Please sign in to comment.