From 882a65862a6a761b813bc8498bf67831cb1a8a39 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 18 Jan 2024 14:48:26 +0800 Subject: [PATCH 1/5] wildcard --- e2e_test/sink/kafka/avro.slt | 7 ++-- src/frontend/planner_test/src/lib.rs | 2 ++ .../src/handler/alter_table_column.rs | 2 ++ src/frontend/src/handler/create_sink.rs | 2 ++ src/frontend/src/handler/create_source.rs | 30 ++++++++++++++-- src/frontend/src/handler/create_table.rs | 9 +++++ src/frontend/src/handler/explain.rs | 2 ++ src/frontend/src/handler/mod.rs | 2 ++ src/sqlparser/src/ast/mod.rs | 5 ++- src/sqlparser/src/ast/statement.rs | 34 ++++++++++++++++--- src/sqlparser/src/parser.rs | 25 +++++++++++--- 11 files changed, 105 insertions(+), 15 deletions(-) diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index 45ecf302f0ddd..3b1685c704e2d 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -1,5 +1,5 @@ statement ok -create table from_kafka ( primary key (some_key) ) +create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) ) include key as some_key with ( connector = 'kafka', @@ -52,6 +52,7 @@ select float_field, double_field, int32_field, + gen_i32_field, int64_field, record_field, array_field, @@ -61,8 +62,8 @@ select time_micros_field, time_millis_field from from_kafka order by string_field; ---- -t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL -f Wave \x5a4446 1.5 NULL 11 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654 +t Rising \x6130 3.5 4.25 22 24 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL +f Wave \x5a4446 1.5 NULL 11 13 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654 statement error SchemaFetchError create sink sink_err from into_kafka with ( diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index e8c2030c74d08..9c1d1200c197a 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -425,6 +425,7 @@ impl TestCase { append_only, cdc_table_info, include_column_options, + wildcard_idx, .. } => { let source_schema = source_schema.map(|schema| schema.into_v2_with_warning()); @@ -433,6 +434,7 @@ impl TestCase { handler_args, name, columns, + wildcard_idx, constraints, if_not_exists, source_schema, diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index cceeaf789f74f..b765c84b6e4fa 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -153,6 +153,7 @@ pub async fn handle_alter_table_column( constraints, source_watermarks, append_only, + wildcard_idx, .. } = definition else { @@ -167,6 +168,7 @@ pub async fn handle_alter_table_column( handler_args, col_id_gen, columns, + wildcard_idx, constraints, source_watermarks, append_only, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 767e4318ab261..f3d9053571558 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -471,6 +471,7 @@ pub(crate) async fn reparse_table_for_sink( let col_id_gen = ColumnIdGenerator::new_alter(table_catalog); let Statement::CreateTable { columns, + wildcard_idx, constraints, source_watermarks, append_only, @@ -488,6 +489,7 @@ pub(crate) async fn reparse_table_for_sink( handler_args, col_id_gen, columns, + wildcard_idx, constraints, source_watermarks, append_only, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f66bc33e32942..e070edcfaae98 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{ is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; -use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError}; +use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::DataType; use risingwave_connector::parser::{ @@ -556,17 +556,40 @@ pub(crate) fn bind_all_columns( cols_from_source: Option>, cols_from_sql: Vec, col_defs_from_sql: &[ColumnDef], + wildcard_idx: Option, ) -> Result> { if let Some(cols_from_source) = cols_from_source { if cols_from_sql.is_empty() { Ok(cols_from_source) + } else if let Some(wildcard_idx) = wildcard_idx { + if cols_from_sql.iter().any(|c| !c.is_generated()) { + Err(RwError::from(NotSupported( + "Only generated columns are allowed in user-defined schema from SQL" + .to_string(), + "Remove the non-generated columns".to_string(), + ))) + } else { + // Replace `*` with `cols_from_source` + let mut cols_from_sql = cols_from_sql; + let mut cols_from_source = cols_from_source; + let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx); + cols_from_sql.append(&mut cols_from_source); + cols_from_sql.append(&mut cols_from_sql_r); + Ok(cols_from_sql) + } } else { // TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209 Err(RwError::from(ProtocolError( - format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \ - Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode)))) + format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \ + Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode)))) } } else { + if wildcard_idx.is_some() { + return Err(RwError::from(NotSupported( + "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_string(), + "Remove the wildcard or use a source with external schema".to_string(), + ))); + } // FIXME(yuhao): cols_from_sql should be None is no `()` is given. if cols_from_sql.is_empty() { return Err(RwError::from(ProtocolError( @@ -1147,6 +1170,7 @@ pub async fn handle_create_source( columns_from_resolve_source, columns_from_sql, &stmt.columns, + stmt.wildcard_idx, )?; // add additional columns before bind pk, because `format upsert` requires the key column handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 85857a1526deb..f1923956c2ef1 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -452,6 +452,7 @@ pub(crate) async fn gen_create_table_plan_with_source( context: OptimizerContext, table_name: ObjectName, column_defs: Vec, + wildcard_idx: Option, constraints: Vec, source_schema: ConnectorSchema, source_watermarks: Vec, @@ -487,6 +488,7 @@ pub(crate) async fn gen_create_table_plan_with_source( columns_from_resolve_source, columns_from_sql, &column_defs, + wildcard_idx, )?; // add additional columns before bind pk, because `format upsert` requires the key column @@ -890,6 +892,7 @@ fn derive_connect_properties( Ok(connect_properties.into_iter().collect()) } +#[allow(clippy::too_many_arguments)] pub(super) async fn handle_create_table_plan( context: OptimizerContext, col_id_gen: ColumnIdGenerator, @@ -897,6 +900,7 @@ pub(super) async fn handle_create_table_plan( cdc_table_info: Option, table_name: ObjectName, column_defs: Vec, + wildcard_idx: Option, constraints: Vec, source_watermarks: Vec, append_only: bool, @@ -915,6 +919,7 @@ pub(super) async fn handle_create_table_plan( context, table_name.clone(), column_defs, + wildcard_idx, constraints, source_schema, source_watermarks, @@ -966,6 +971,7 @@ pub async fn handle_create_table( handler_args: HandlerArgs, table_name: ObjectName, column_defs: Vec, + wildcard_idx: Option, constraints: Vec, if_not_exists: bool, source_schema: Option, @@ -998,6 +1004,7 @@ pub async fn handle_create_table( cdc_table_info, table_name.clone(), column_defs, + wildcard_idx, constraints, source_watermarks, append_only, @@ -1059,6 +1066,7 @@ pub async fn generate_stream_graph_for_table( handler_args: HandlerArgs, col_id_gen: ColumnIdGenerator, columns: Vec, + wildcard_idx: Option, constraints: Vec, source_watermarks: Vec, append_only: bool, @@ -1072,6 +1080,7 @@ pub async fn generate_stream_graph_for_table( context, table_name, columns, + wildcard_idx, constraints, source_schema, source_watermarks, diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index ca76abe8e8d8a..b7981cf7aec6e 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -64,6 +64,7 @@ async fn do_handle_explain( append_only, cdc_table_info, include_column_options, + wildcard_idx, .. } => { let col_id_gen = ColumnIdGenerator::new_initial(); @@ -77,6 +78,7 @@ async fn do_handle_explain( cdc_table_info, name.clone(), columns, + wildcard_idx, constraints, source_watermarks, append_only, diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index b4227a765e856..806daa89ce026 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -244,6 +244,7 @@ pub async fn handle( Statement::CreateTable { name, columns, + wildcard_idx, constraints, query, with_options: _, // It is put in OptimizerContext @@ -279,6 +280,7 @@ pub async fn handle( handler_args, name, columns, + wildcard_idx, constraints, if_not_exists, source_schema, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index eef14722ee841..b11d4dc784bb4 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1117,6 +1117,8 @@ pub enum Statement { name: ObjectName, /// Optional schema columns: Vec, + // The wildchar position in columns defined in sql. Only exist when using external schema. + wildcard_idx: Option, constraints: Vec, with_options: Vec, /// Optional schema of the external source with which the table is created @@ -1606,6 +1608,7 @@ impl fmt::Display for Statement { Statement::CreateTable { name, columns, + wildcard_idx, constraints, with_options, or_replace, @@ -1634,7 +1637,7 @@ impl fmt::Display for Statement { name = name, )?; if !columns.is_empty() || !constraints.is_empty() { - write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks)?)?; + write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks, *wildcard_idx)?)?; } else if query.is_none() { // PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens write!(f, " ()")?; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 5624a74c621ed..e394697d45a9e 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -80,6 +80,8 @@ macro_rules! impl_fmt_display { pub struct CreateSourceStatement { pub if_not_exists: bool, pub columns: Vec, + // The wildchar position in columns defined in sql. Only exist when using external schema. + pub wildcard_idx: Option, pub constraints: Vec, pub source_name: ObjectName, pub with_properties: WithProperties, @@ -325,7 +327,8 @@ impl ParseTo for CreateSourceStatement { impl_parse_to!(source_name: ObjectName, p); // parse columns - let (columns, constraints, source_watermarks) = p.parse_columns_with_watermark()?; + let (columns, constraints, source_watermarks, wildcard_idx) = + p.parse_columns_with_watermark()?; let include_options = p.parse_include_options()?; let with_options = p.parse_with_properties()?; @@ -343,6 +346,7 @@ impl ParseTo for CreateSourceStatement { Ok(Self { if_not_exists, columns, + wildcard_idx, constraints, source_name, with_properties: WithProperties(with_options), @@ -357,11 +361,28 @@ pub(super) fn fmt_create_items( columns: &[ColumnDef], constraints: &[TableConstraint], watermarks: &[SourceWatermark], + wildcard_idx: Option, ) -> std::result::Result { let mut items = String::new(); - let has_items = !columns.is_empty() || !constraints.is_empty() || !watermarks.is_empty(); + let has_items = !columns.is_empty() + || !constraints.is_empty() + || !watermarks.is_empty() + || wildcard_idx.is_some(); has_items.then(|| write!(&mut items, "(")); - write!(&mut items, "{}", display_comma_separated(columns))?; + if let Some(wildcard_idx) = wildcard_idx { + let (columns_l, columns_r) = columns.split_at(wildcard_idx); + write!(&mut items, "{}", display_comma_separated(columns_l))?; + if !columns_l.is_empty() { + write!(&mut items, ", ")?; + } + write!(&mut items, "{}", Token::Mul)?; + if !columns_r.is_empty() { + write!(&mut items, ", ")?; + } + write!(&mut items, "{}", display_comma_separated(columns_r))?; + } else { + write!(&mut items, "{}", display_comma_separated(columns))?; + } if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) { write!(&mut items, ", ")?; } @@ -380,7 +401,12 @@ impl fmt::Display for CreateSourceStatement { impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self); impl_fmt_display!(source_name, v, self); - let items = fmt_create_items(&self.columns, &self.constraints, &self.source_watermarks)?; + let items = fmt_create_items( + &self.columns, + &self.constraints, + &self.source_watermarks, + self.wildcard_idx, + )?; if !items.is_empty() { v.push(items); } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 1eae3eccc03f5..3a81842317475 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -131,7 +131,12 @@ impl fmt::Display for ParserError { #[cfg(feature = "std")] impl std::error::Error for ParserError {} -type ColumnsDefTuple = (Vec, Vec, Vec); +type ColumnsDefTuple = ( + Vec, + Vec, + Vec, + Option, +); /// Reference: /// @@ -2455,7 +2460,8 @@ impl Parser { let if_not_exists = self.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); let table_name = self.parse_object_name()?; // parse optional column list (schema) and watermarks on source. - let (columns, constraints, source_watermarks) = self.parse_columns_with_watermark()?; + let (columns, constraints, source_watermarks, wildcard_idx) = + self.parse_columns_with_watermark()?; let append_only = if self.parse_keyword(Keyword::APPEND) { self.expect_keyword(Keyword::ONLY)?; @@ -2507,6 +2513,7 @@ impl Parser { name: table_name, temporary, columns, + wildcard_idx, constraints, with_options, or_replace, @@ -2538,11 +2545,21 @@ impl Parser { let mut columns = vec![]; let mut constraints = vec![]; let mut watermarks = vec![]; + let mut wildcard_idx = None; if !self.consume_token(&Token::LParen) || self.consume_token(&Token::RParen) { - return Ok((columns, constraints, watermarks)); + return Ok((columns, constraints, watermarks, wildcard_idx)); } loop { + if self.consume_token(&Token::Mul) { + if wildcard_idx.is_none() { + wildcard_idx = Some(columns.len()); + } else { + return Err(ParserError::ParserError( + "At most 1 wildcard is allowed in source definetion".to_string(), + )); + } + } if let Some(constraint) = self.parse_optional_table_constraint()? { constraints.push(constraint); } else if let Some(watermark) = self.parse_optional_watermark()? { @@ -2567,7 +2584,7 @@ impl Parser { } } - Ok((columns, constraints, watermarks)) + Ok((columns, constraints, watermarks, wildcard_idx)) } fn parse_column_def(&mut self) -> Result { From b4f874f6ef3e2c51c42ce382367f5a866e40c56d Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 18 Jan 2024 15:18:45 +0800 Subject: [PATCH 2/5] parser test --- src/sqlparser/tests/testdata/create.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 69167205591a2..94873541e1d81 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -35,13 +35,13 @@ Near "pad CHARACTER VARYING) FROM sbtest" - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' - input: CREATE TABLE T (v1 INT, v2 STRUCT) formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT) - input: CREATE TABLE T (v1 INT, v2 STRUCT>) From 0edd49840961e9da1e6d861ca7e847c53027f1f9 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 18 Jan 2024 15:49:54 +0800 Subject: [PATCH 3/5] fix test --- src/tests/sqlsmith/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 2d8c23a52b740..34ab90cb497a6 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -274,6 +274,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); options: [], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -319,6 +320,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); options: [], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -375,6 +377,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); options: [], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -507,6 +510,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY ], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -559,6 +563,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY ], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -618,6 +623,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY ], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, @@ -695,6 +701,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY ], }, ], + wildcard_idx: None, constraints: [], with_options: [], source_schema: None, From e41cd37cf9a4250c441382908f2abf0f838db11c Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 18 Jan 2024 16:12:40 +0800 Subject: [PATCH 4/5] fix --- src/sqlparser/src/parser.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 3a81842317475..e88a1df3157da 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2559,8 +2559,7 @@ impl Parser { "At most 1 wildcard is allowed in source definetion".to_string(), )); } - } - if let Some(constraint) = self.parse_optional_table_constraint()? { + } else if let Some(constraint) = self.parse_optional_table_constraint()? { constraints.push(constraint); } else if let Some(watermark) = self.parse_optional_watermark()? { watermarks.push(watermark); From bbbb1b225b76a5d5c2655d80756eb9dc286878ce Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 18 Jan 2024 16:48:07 +0800 Subject: [PATCH 5/5] fix --- src/frontend/src/handler/create_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e070edcfaae98..1cb91c25356bc 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -562,7 +562,7 @@ pub(crate) fn bind_all_columns( if cols_from_sql.is_empty() { Ok(cols_from_source) } else if let Some(wildcard_idx) = wildcard_idx { - if cols_from_sql.iter().any(|c| !c.is_generated()) { + if col_defs_from_sql.iter().any(|c| !c.is_generated()) { Err(RwError::from(NotSupported( "Only generated columns are allowed in user-defined schema from SQL" .to_string(),