Skip to content

Commit

Permalink
fix(sqlparser): Display multiple source INCLUDE columns correctly (
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Oct 10, 2024
1 parent 71001d8 commit e0fc16c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 33 deletions.
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/legacy_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use serde::{Deserialize, Serialize};
use winnow::PResult;

use crate::ast::{
AstString, AstVec, ConnectorSchema, Encode, Format, Ident, ObjectName, ParseTo, SqlOption,
Value,
display_separated, AstString, ConnectorSchema, Encode, Format, Ident, ObjectName, ParseTo,
SqlOption, Value,
};
use crate::keywords::Keyword;
use crate::parser::{Parser, StrError};
Expand Down Expand Up @@ -425,7 +425,7 @@ impl fmt::Display for CsvInfo {
if !self.has_header {
v.push(format!(
"{}",
AstVec([Keyword::WITHOUT, Keyword::HEADER].to_vec())
display_separated(&[Keyword::WITHOUT, Keyword::HEADER], " ")
));
}
impl_fmt_display!(delimiter, v, self);
Expand Down
50 changes: 32 additions & 18 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1841,24 +1841,8 @@ impl fmt::Display for Statement {
if let Some(version_column) = with_version_column {
write!(f, " WITH VERSION COLUMN({})", version_column)?;
}
if !include_column_options.is_empty() { // (Ident, Option<Ident>)
write!(f, "{}", display_comma_separated(
include_column_options.iter().map(|option_item: &IncludeOptionItem| {
format!(" INCLUDE {}{}{}",
option_item.column_type,
if let Some(inner_field) = &option_item.inner_field {
format!(" {}", inner_field)
} else {
"".into()
}
, if let Some(alias) = &option_item.column_alias {
format!(" AS {}", alias)
} else {
"".into()
}
)
}).collect_vec().as_slice()
))?;
if !include_column_options.is_empty() {
write!(f, " {}", display_separated(include_column_options, " "))?;
}
if !with_options.is_empty() {
write!(f, " WITH ({})", display_comma_separated(with_options))?;
Expand Down Expand Up @@ -2187,6 +2171,36 @@ impl fmt::Display for Statement {
}
}

impl Display for IncludeOptionItem {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
column_type,
inner_field,
header_inner_expect_type,
column_alias,
} = self;
write!(f, "INCLUDE {}", column_type)?;
if let Some(inner_field) = inner_field {
write!(f, " '{}'", value::escape_single_quote_string(inner_field))?;
if let Some(expected_type) = header_inner_expect_type {
write!(
f,
" {}",
match expected_type {
DataType::Varchar => "varchar",
DataType::Bytea => "bytea",
t => unreachable!("unparse header expected type: {t}"),
}
)?;
}
}
if let Some(alias) = column_alias {
write!(f, " AS {}", alias)?;
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[non_exhaustive]
Expand Down
17 changes: 5 additions & 12 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ macro_rules! impl_fmt_display {
}};
($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
if $self.$field {
$v.push(format!("{}", AstVec([$($arr)+].to_vec())));
$v.push(format!("{}", display_separated(&[$($arr)+], " ")));
}
};
([$($arr:tt)+], $v:ident) => {
$v.push(format!("{}", AstVec([$($arr)+].to_vec())));
$v.push(format!("{}", display_separated(&[$($arr)+], " ")));
};
}

Expand Down Expand Up @@ -463,6 +463,9 @@ impl fmt::Display for CreateSourceStatement {
v.push(items);
}

for item in &self.include_column_options {
v.push(format!("{}", item));
}
impl_fmt_display!(with_properties, v, self);
impl_fmt_display!(source_schema, v, self);
v.iter().join(" ").fmt(f)
Expand Down Expand Up @@ -867,16 +870,6 @@ impl fmt::Display for CreateSecretStatement {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AstVec<T>(pub Vec<T>);

impl<T: fmt::Display> fmt::Display for AstVec<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.iter().join(" ").fmt(f)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct WithProperties(pub Vec<SqlOption>);
Expand Down
3 changes: 3 additions & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2663,6 +2663,9 @@ impl Parser<'_> {
column_alias,
header_inner_expect_type,
});

// tolerate previous bug #18800 of displaying with comma separation
let _ = self.consume_token(&Token::Comma);
}
Ok(options)
}
Expand Down
24 changes: 24 additions & 0 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@
- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0')
formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }'
- input: |-
CREATE SOURCE s
(raw BYTEA)
INCLUDE header AS all_headers
INCLUDE header 'foo' AS foo_bytea
INCLUDE header 'foo' VARCHAR AS foo_str
WITH (
connector = 'kafka',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE SOURCE s (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: |-
CREATE TABLE t
(raw BYTEA)
INCLUDE header AS all_headers
INCLUDE header 'foo' AS foo_bytea, -- tolerate extra comma due to previous bug #18800
INCLUDE header 'foo' VARCHAR AS foo_str
WITH (
connector = 'kafka',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT, v3 STRUCT<v1 INT, v2 INT>>)
Expand Down

0 comments on commit e0fc16c

Please sign in to comment.