From 8c22f0bd8920a18773fcc55f4bb325b51501939a Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 26 Oct 2023 13:21:26 +0800 Subject: [PATCH 1/3] refactor(sqlparser): rename `SourceSchemaV2`/`SinkSchema` to `ConnectorSchema` --- .../src/handler/alter_table_column.rs | 4 +- src/frontend/src/handler/create_sink.rs | 8 +- src/frontend/src/handler/create_source.rs | 8 +- src/frontend/src/handler/create_table.rs | 10 +-- src/sqlparser/src/ast/statement.rs | 77 +++++-------------- 5 files changed, 35 insertions(+), 72 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1a6d02b963e9d..716fb33460362 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -22,7 +22,7 @@ use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement, + AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, }; use risingwave_sqlparser::parser::Parser; @@ -262,7 +262,7 @@ pub async fn handle_alter_table_column( Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } -fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool { +fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool { match schema.row_encode { Encode::Avro | Encode::Protobuf => true, Encode::Json => { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ddb1d697b856d..2ab987308a4f5 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -27,8 +27,8 @@ use risingwave_connector::sink::{ }; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ - CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, Select, - SelectItem, SetExpr, SinkSchema, TableFactor, TableWithJoins, + ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, + Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; use super::create_mv::get_column_names; @@ -228,7 +228,7 @@ pub async fn handle_create_sink( /// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`. /// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`] /// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`. -fn bind_sink_format_desc(value: SinkSchema) -> Result { +fn bind_sink_format_desc(value: ConnectorSchema) -> Result { use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat}; use risingwave_sqlparser::ast::{Encode as E, Format as F}; @@ -288,7 +288,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock Result<()> { +pub fn validate_compatibility(connector: &str, format_desc: &ConnectorSchema) -> Result<()> { let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS .get(connector) .ok_or_else(|| { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0c2398a608eb8..67d61601995d9 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -46,8 +46,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_sqlparser::ast::{ - self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, CreateSourceStatement, - DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceSchemaV2, SourceWatermark, + self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, ConnectorSchema, + CreateSourceStatement, DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark, }; use super::RwPgResponse; @@ -278,7 +278,7 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result for more information. /// return `(columns, pk_names, source info)` pub(crate) async fn try_bind_columns_from_source( - source_schema: &SourceSchemaV2, + source_schema: &ConnectorSchema, sql_defined_pk_names: Vec, sql_defined_columns: &[ColumnDef], with_properties: &HashMap, @@ -904,7 +904,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock, ) -> Result<()> { let connector = get_connector(props) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index bb02797c21395..e412658cb712f 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -33,7 +33,7 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ - ColumnDef, ColumnOption, DataType as AstDataType, Format, ObjectName, SourceSchemaV2, + ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, ObjectName, SourceWatermark, TableConstraint, }; @@ -436,7 +436,7 @@ pub(crate) async fn gen_create_table_plan_with_source( table_name: ObjectName, column_defs: Vec, constraints: Vec, - source_schema: SourceSchemaV2, + source_schema: ConnectorSchema, source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, append_only: bool, @@ -749,7 +749,7 @@ pub async fn handle_create_table( columns: Vec, constraints: Vec, if_not_exists: bool, - source_schema: Option, + source_schema: Option, source_watermarks: Vec, append_only: bool, notice: Option, @@ -826,8 +826,8 @@ pub async fn handle_create_table( pub fn check_create_table_with_source( with_options: &WithOptions, - source_schema: Option, -) -> Result> { + source_schema: Option, +) -> Result> { if with_options.inner().contains_key(UPSTREAM_SOURCE_KEY) { source_schema.as_ref().ok_or_else(|| { ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned()) diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 58fb2d50c6287..4295a57411ef8 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -105,7 +105,7 @@ pub enum SourceSchema { } impl SourceSchema { - pub fn into_source_schema_v2(self) -> SourceSchemaV2 { + pub fn into_source_schema_v2(self) -> ConnectorSchema { let (format, row_encode) = match self { SourceSchema::Protobuf(_) => (Format::Plain, Encode::Protobuf), SourceSchema::Json => (Format::Plain, Encode::Json), @@ -205,7 +205,7 @@ impl SourceSchema { _ => vec![], }; - SourceSchemaV2 { + ConnectorSchema { format, row_encode, row_options, @@ -336,7 +336,7 @@ impl Encode { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct SourceSchemaV2 { +pub struct ConnectorSchema { pub format: Format, pub row_encode: Encode, pub row_options: Vec, @@ -346,7 +346,7 @@ pub struct SourceSchemaV2 { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum CompatibleSourceSchema { RowFormat(SourceSchema), - V2(SourceSchemaV2), + V2(ConnectorSchema), } impl fmt::Display for CompatibleSourceSchema { @@ -363,7 +363,7 @@ impl fmt::Display for CompatibleSourceSchema { } impl CompatibleSourceSchema { - pub fn into_source_schema_v2(self) -> (SourceSchemaV2, Option) { + pub fn into_source_schema_v2(self) -> (ConnectorSchema, Option) { match self { CompatibleSourceSchema::RowFormat(inner) => ( inner.into_source_schema_v2(), @@ -373,29 +373,15 @@ impl CompatibleSourceSchema { } } -impl From for CompatibleSourceSchema { - fn from(value: SourceSchemaV2) -> Self { +impl From for CompatibleSourceSchema { + fn from(value: ConnectorSchema) -> Self { Self::V2(value) } } fn parse_source_schema(p: &mut Parser) -> Result { - if p.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) { - p.expect_keyword(Keyword::FORMAT)?; - let id = p.parse_identifier()?; - let s = id.value.to_ascii_uppercase(); - let format = Format::from_keyword(&s)?; - p.expect_keyword(Keyword::ENCODE)?; - let id = p.parse_identifier()?; - let s = id.value.to_ascii_uppercase(); - let row_encode = Encode::from_keyword(&s)?; - let row_options = p.parse_options()?; - - Ok(CompatibleSourceSchema::V2(SourceSchemaV2 { - format, - row_encode, - row_options, - })) + if let Some(schema_v2) = p.parse_schema()? { + Ok(CompatibleSourceSchema::V2(schema_v2)) } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW]) && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT]) { @@ -465,7 +451,7 @@ impl Parser { // row format for nexmark source must be native // default row format for datagen source is native if connector.contains("-cdc") { - let expected = SourceSchemaV2::debezium_json(); + let expected = ConnectorSchema::debezium_json(); if self.peek_source_schema_format() { let schema = parse_source_schema(self)?.into_source_schema_v2().0; if schema != expected { @@ -477,7 +463,7 @@ impl Parser { } Ok(expected.into()) } else if connector.contains("nexmark") { - let expected = SourceSchemaV2::native(); + let expected = ConnectorSchema::native(); if self.peek_source_schema_format() { let schema = parse_source_schema(self)?.into_source_schema_v2().0; if schema != expected { @@ -492,17 +478,15 @@ impl Parser { Ok(if self.peek_source_schema_format() { parse_source_schema(self)? } else { - SourceSchemaV2::native().into() + ConnectorSchema::native().into() }) } else { Ok(parse_source_schema(self)?) } } - /// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SINK`. - /// - /// TODO: After [`SourceSchemaV2`] and [`SinkSchema`] merge, call this in [`parse_source_schema`]. - pub fn parse_schema(&mut self) -> Result, ParserError> { + /// Parse `FORMAT ... ENCODE ... (...)` in `CREATE SOURCE` and `CREATE SINK`. + pub fn parse_schema(&mut self) -> Result, ParserError> { if !self.parse_keyword(Keyword::FORMAT) { return Ok(None); } @@ -516,7 +500,7 @@ impl Parser { let row_encode = Encode::from_keyword(&s)?; let row_options = self.parse_options()?; - Ok(Some(SinkSchema { + Ok(Some(ConnectorSchema { format, row_encode, row_options, @@ -524,10 +508,10 @@ impl Parser { } } -impl SourceSchemaV2 { +impl ConnectorSchema { /// Create a new source schema with `Debezium` format and `Json` encoding. pub const fn debezium_json() -> Self { - SourceSchemaV2 { + ConnectorSchema { format: Format::Debezium, row_encode: Encode::Json, row_options: Vec::new(), @@ -536,7 +520,7 @@ impl SourceSchemaV2 { /// Create a new source schema with `Native` format and encoding. pub const fn native() -> Self { - SourceSchemaV2 { + ConnectorSchema { format: Format::Native, row_encode: Encode::Native, row_options: Vec::new(), @@ -564,7 +548,7 @@ impl SourceSchemaV2 { } } -impl fmt::Display for SourceSchemaV2 { +impl fmt::Display for ConnectorSchema { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?; @@ -823,27 +807,6 @@ impl fmt::Display for CreateSink { } } -/// Same as [`SourceSchemaV2`]. Will be merged in a dedicated rename PR. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct SinkSchema { - pub format: Format, - pub row_encode: Encode, - pub row_options: Vec, -} - -impl fmt::Display for SinkSchema { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?; - - if !self.row_options.is_empty() { - write!(f, " ({})", display_comma_separated(&self.row_options)) - } else { - Ok(()) - } - } -} - // sql_grammar!(CreateSinkStatement { // if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], // sink_name: Ident, @@ -860,7 +823,7 @@ pub struct CreateSinkStatement { pub sink_from: CreateSink, pub columns: Vec, pub emit_mode: Option, - pub sink_schema: Option, + pub sink_schema: Option, } impl ParseTo for CreateSinkStatement { From a7d202a064bfe1f345a12e49b3f199b22871f1d6 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 26 Oct 2023 13:38:29 +0800 Subject: [PATCH 2/3] `ConnectorSchema::gen_options` -> `WithOptions::try_from` --- src/frontend/src/handler/alter_table_column.rs | 6 +++--- src/frontend/src/handler/create_source.rs | 7 +++---- src/sqlparser/src/ast/statement.rs | 17 ----------------- 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 716fb33460362..18313e0458a04 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -32,7 +32,7 @@ use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; use crate::handler::create_table::gen_create_table_plan_with_source; -use crate::{build_graph, Binder, OptimizerContext, TableCatalog}; +use crate::{build_graph, Binder, OptimizerContext, TableCatalog, WithOptions}; /// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or /// `DropColumn`. @@ -266,8 +266,8 @@ fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool { match schema.row_encode { Encode::Avro | Encode::Protobuf => true, Encode::Json => { - let mut options = schema.gen_options().unwrap(); - matches!(get_json_schema_location(&mut options), Ok(Some(_))) + let mut options = WithOptions::try_from(schema.row_options()).unwrap(); + matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_))) } _ => false, } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 67d61601995d9..0ce3e32ed584e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -15,7 +15,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::LazyLock; -use anyhow::anyhow; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; @@ -290,7 +289,7 @@ pub(crate) async fn try_bind_columns_from_source( let sql_defined_pk = !sql_defined_pk_names.is_empty(); let sql_defined_schema = !sql_defined_columns.is_empty(); let is_kafka: bool = is_kafka_connector(with_properties); - let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?; + let mut options = WithOptions::try_from(source_schema.row_options())?.into_inner(); let get_key_message_name = |options: &mut BTreeMap| -> Option { consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) @@ -922,8 +921,8 @@ pub fn validate_compatibility( if connector != KAFKA_CONNECTOR { let res = match (&source_schema.format, &source_schema.row_encode) { (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { - let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?; - let (_, use_schema_registry) = get_schema_location(&mut options)?; + let mut options = WithOptions::try_from(source_schema.row_options())?; + let (_, use_schema_registry) = get_schema_location(options.inner_mut())?; use_schema_registry } (Format::Debezium, Encode::Avro) => true, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 4295a57411ef8..3ff012c81b766 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -13,7 +13,6 @@ // limitations under the License. use core::fmt; -use std::collections::BTreeMap; use std::fmt::Write; use itertools::Itertools; @@ -527,22 +526,6 @@ impl ConnectorSchema { } } - pub fn gen_options(&self) -> Result, ParserError> { - self.row_options - .iter() - .cloned() - .map(|x| match x.value { - Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)), - Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), - Value::Number(n) => Ok((x.name.real_value(), n)), - Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), - _ => Err(ParserError::ParserError( - "`row format options` only support single quoted string value and C style escaped string".to_owned(), - )), - }) - .try_collect() - } - pub fn row_options(&self) -> &[SqlOption] { self.row_options.as_ref() } From ce9778d53384cde315fb9d7b947a21f4455ccc10 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 26 Oct 2023 14:12:45 +0800 Subject: [PATCH 3/3] fix AST Debug output in parser tests --- src/sqlparser/tests/testdata/create.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 5509ccad53a04..cde40d8a75da1 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -21,13 +21,13 @@ formatted_sql: CREATE SOURCE src FORMAT PLAIN ENCODE JSON - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(SourceSchemaV2 { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(SourceSchemaV2 { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(SourceSchemaV2 { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }] } }' - input: CREATE TABLE T (v1 INT, v2 STRUCT) formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT) - input: CREATE TABLE T (v1 INT, v2 STRUCT>)