Skip to content

Commit

Permalink
fix(sqlparser): display create items with comma properly (#18393)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Sep 4, 2024
1 parent a7480e1 commit 0a4ccde
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ pub(super) fn fmt_create_items(
|| !watermarks.is_empty()
|| wildcard_idx.is_some();
has_items.then(|| write!(&mut items, "("));

if let Some(wildcard_idx) = wildcard_idx {
let (columns_l, columns_r) = columns.split_at(wildcard_idx);
write!(&mut items, "{}", display_comma_separated(columns_l))?;
Expand All @@ -426,14 +427,21 @@ pub(super) fn fmt_create_items(
} else {
write!(&mut items, "{}", display_comma_separated(columns))?;
}
if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) {
let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();

if leading_items && !constraints.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(constraints))?;
if !columns.is_empty() && !constraints.is_empty() && !watermarks.is_empty() {
leading_items |= !constraints.is_empty();

if leading_items && !watermarks.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(watermarks))?;
// uncomment this when adding more sections below
// leading_items |= !watermarks.is_empty();

has_items.then(|| write!(&mut items, ")"));
Ok(items)
}
Expand Down
6 changes: 6 additions & 0 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
- input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }'
- input: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_sql: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: Some(0), constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }'
- input: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_sql: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [Unique { name: None, columns: [Ident { value: "event_id", quote_style: None }], is_primary: true }], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }'
- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0')
formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }'
Expand Down

0 comments on commit 0a4ccde

Please sign in to comment.