Skip to content

Commit

Permalink
fix(sink): fix create sink error message (#13839)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 7, 2023
1 parent 744df85 commit 2beb973
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 1 deletion.
2 changes: 1 addition & 1 deletion e2e_test/ddl/invalid_operation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ CREATE SINK sink FROM mv WITH (connector='blackhole');
statement error not found
drop table sink;

statement error sink properties not provided
statement error sql parser error
CREATE SINK sink FROM mv;

# FIXME: improve the error message
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@
);
expected_outputs:
- explain_output
- id: create_emit_on_close_sink
sql: |
create table t2 (a int, b int, watermark for b as b - 4) append only;
explain create sink sk1 from t2 emit on window close with (connector='blackhole');
expected_outputs:
- explain_output
8 changes: 8 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,11 @@
└─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] }
└─StreamExchange { dist: HashShard(t1.v1, t1.v2) }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4] }
- id: create_emit_on_close_sink
sql: |
create table t2 (a int, b int, watermark for b as b - 4) append only;
explain create sink sk1 from t2 emit on window close with (connector='blackhole');
explain_output: |
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] }
└─StreamEowcSort { sort_column: t2.b }
└─StreamTableScan { table: t2, columns: [a, b, _row_id] }
5 changes: 5 additions & 0 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,11 @@ impl ParseTo for CreateSinkStatement {

let emit_mode = p.parse_emit_mode()?;

// This check cannot be put into the `WithProperties::parse_to`, since other
// statements may not need the with properties.
if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
p.expected("WITH", p.peek_token())?
}
impl_parse_to!(with_properties: WithProperties, p);
if with_properties.0.is_empty() {
return Err(ParserError::ParserError(
Expand Down
4 changes: 4 additions & 0 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
error_msg: |-
sql parser error: Expected identifier, found: ; at line:1, column:123
Near " topic = 'test_topic') format;"
- input: create sink sk1 from tt where v1 % 10 = 0 with (connector='blackhole')
error_msg: |-
sql parser error: Expected WITH, found: where at line:1, column:30
Near "create sink sk1 from tt"
- input: CREATE SINK snk FROM mv WITH (connector = 'kafka', properties.bootstrap.server = '127.0.0.1:9092', topic = 'test_topic') format debezium;
error_msg: |-
sql parser error: Expected ENCODE, found: ; at line:1, column:132
Expand Down

0 comments on commit 2beb973

Please sign in to comment.