Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sqlparser): Display multiple source INCLUDE columns correctly (#18813) #18831

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/legacy_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use serde::{Deserialize, Serialize};
use winnow::PResult;

use crate::ast::{
AstString, AstVec, ConnectorSchema, Encode, Format, Ident, ObjectName, ParseTo, SqlOption,
Value,
display_separated, AstString, ConnectorSchema, Encode, Format, Ident, ObjectName, ParseTo,
SqlOption, Value,
};
use crate::keywords::Keyword;
use crate::parser::{Parser, StrError};
Expand Down Expand Up @@ -425,7 +425,7 @@ impl fmt::Display for CsvInfo {
if !self.has_header {
v.push(format!(
"{}",
AstVec([Keyword::WITHOUT, Keyword::HEADER].to_vec())
display_separated(&[Keyword::WITHOUT, Keyword::HEADER], " ")
));
}
impl_fmt_display!(delimiter, v, self);
Expand Down
50 changes: 32 additions & 18 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1841,24 +1841,8 @@ impl fmt::Display for Statement {
if let Some(version_column) = with_version_column {
write!(f, " WITH VERSION COLUMN({})", version_column)?;
}
if !include_column_options.is_empty() { // (Ident, Option<Ident>)
write!(f, "{}", display_comma_separated(
include_column_options.iter().map(|option_item: &IncludeOptionItem| {
format!(" INCLUDE {}{}{}",
option_item.column_type,
if let Some(inner_field) = &option_item.inner_field {
format!(" {}", inner_field)
} else {
"".into()
}
, if let Some(alias) = &option_item.column_alias {
format!(" AS {}", alias)
} else {
"".into()
}
)
}).collect_vec().as_slice()
))?;
if !include_column_options.is_empty() {
write!(f, " {}", display_separated(include_column_options, " "))?;
}
if !with_options.is_empty() {
write!(f, " WITH ({})", display_comma_separated(with_options))?;
Expand Down Expand Up @@ -2187,6 +2171,36 @@ impl fmt::Display for Statement {
}
}

impl Display for IncludeOptionItem {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
column_type,
inner_field,
header_inner_expect_type,
column_alias,
} = self;
write!(f, "INCLUDE {}", column_type)?;
if let Some(inner_field) = inner_field {
write!(f, " '{}'", value::escape_single_quote_string(inner_field))?;
if let Some(expected_type) = header_inner_expect_type {
write!(
f,
" {}",
match expected_type {
DataType::Varchar => "varchar",
DataType::Bytea => "bytea",
t => unreachable!("unparse header expected type: {t}"),
}
)?;
}
}
if let Some(alias) = column_alias {
write!(f, " AS {}", alias)?;
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[non_exhaustive]
Expand Down
17 changes: 5 additions & 12 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ macro_rules! impl_fmt_display {
}};
($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
if $self.$field {
$v.push(format!("{}", AstVec([$($arr)+].to_vec())));
$v.push(format!("{}", display_separated(&[$($arr)+], " ")));
}
};
([$($arr:tt)+], $v:ident) => {
$v.push(format!("{}", AstVec([$($arr)+].to_vec())));
$v.push(format!("{}", display_separated(&[$($arr)+], " ")));
};
}

Expand Down Expand Up @@ -499,6 +499,9 @@ impl fmt::Display for CreateSourceStatement {
v.push(items);
}

for item in &self.include_column_options {
v.push(format!("{}", item));
}
impl_fmt_display!(with_properties, v, self);
impl_fmt_display!(source_schema, v, self);
v.iter().join(" ").fmt(f)
Expand Down Expand Up @@ -897,16 +900,6 @@ impl fmt::Display for CreateSecretStatement {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AstVec<T>(pub Vec<T>);

impl<T: fmt::Display> fmt::Display for AstVec<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.iter().join(" ").fmt(f)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct WithProperties(pub Vec<SqlOption>);
Expand Down
3 changes: 3 additions & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,9 @@ impl Parser<'_> {
column_alias,
header_inner_expect_type,
});

// tolerate previous bug #18800 of displaying with comma separation
let _ = self.consume_token(&Token::Comma);
}
Ok(options)
}
Expand Down
24 changes: 24 additions & 0 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@
- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0')
formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE
formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }'
- input: |-
CREATE SOURCE s
(raw BYTEA)
INCLUDE header AS all_headers
INCLUDE header 'foo' AS foo_bytea
INCLUDE header 'foo' VARCHAR AS foo_str
WITH (
connector = 'kafka',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE SOURCE s (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: |-
CREATE TABLE t
(raw BYTEA)
INCLUDE header AS all_headers
INCLUDE header 'foo' AS foo_bytea, -- tolerate extra comma due to previous bug #18800
INCLUDE header 'foo' VARCHAR AS foo_str
WITH (
connector = 'kafka',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
topic = 'dummy')
FORMAT plain ENCODE bytes;
formatted_sql: CREATE TABLE t (raw BYTEA) INCLUDE header AS all_headers INCLUDE header 'foo' bytea AS foo_bytea INCLUDE header 'foo' varchar AS foo_str WITH (connector = 'kafka', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', topic = 'dummy') FORMAT PLAIN ENCODE BYTES
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT>)
- input: CREATE TABLE T (v1 INT, v2 STRUCT<v1 INT, v2 INT, v3 STRUCT<v1 INT, v2 INT>>)
Expand Down
Loading