From bbd508f2d6ccea1626519220c746e2281be96941 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 29 Oct 2024 17:41:24 +0800 Subject: [PATCH] refactor: rename ConnectorSchema/SourceSchema to FormatEncode Signed-off-by: xxchan --- src/frontend/planner_test/src/lib.rs | 6 +- .../src/handler/alter_source_with_sr.rs | 47 +++---- .../src/handler/alter_table_column.rs | 30 ++-- .../src/handler/alter_table_with_sr.rs | 20 +-- src/frontend/src/handler/create_sink.rs | 17 ++- src/frontend/src/handler/create_source.rs | 98 ++++++------- src/frontend/src/handler/create_table.rs | 60 ++++---- src/frontend/src/handler/explain.rs | 6 +- src/frontend/src/handler/mod.rs | 10 +- src/frontend/src/handler/util.rs | 16 +-- src/meta/src/manager/diagnose.rs | 20 +-- src/sqlparser/src/ast/ddl.rs | 8 +- src/sqlparser/src/ast/legacy_source.rs | 132 +++++++++--------- src/sqlparser/src/ast/mod.rs | 12 +- src/sqlparser/src/ast/statement.rs | 66 ++++----- src/sqlparser/src/parser.rs | 16 +-- src/sqlparser/tests/testdata/create.yaml | 10 +- src/tests/sqlsmith/src/lib.rs | 14 +- 18 files changed, 296 insertions(+), 292 deletions(-) diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index a6ec179011771..528fa88ef3506 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -427,7 +427,7 @@ impl TestCase { columns, constraints, if_not_exists, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -437,7 +437,7 @@ impl TestCase { wildcard_idx, .. } => { - let source_schema = source_schema.map(|schema| schema.into_v2_with_warning()); + let format_encode = format_encode.map(|schema| schema.into_v2_with_warning()); create_table::handle_create_table( handler_args, @@ -446,7 +446,7 @@ impl TestCase { wildcard_idx, constraints, if_not_exists, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index bf8cf991d1a4f..5e889ef9f0d7e 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -24,7 +24,7 @@ use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_sqlparser::ast::{ - CompatibleSourceSchema, ConnectorSchema, CreateSourceStatement, Encode, Format, ObjectName, + CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName, SqlOption, Statement, }; use risingwave_sqlparser::parser::Parser; @@ -120,7 +120,7 @@ pub fn fetch_source_catalog_with_db_schema_id( /// and if the FORMAT and ENCODE are modified. pub fn check_format_encode( original_source: &SourceCatalog, - new_connector_schema: &ConnectorSchema, + new_format_encode: &FormatEncodeOptions, ) -> Result<()> { let StreamSourceInfo { format, row_encode, .. @@ -137,9 +137,7 @@ pub fn check_format_encode( .into()); }; - if new_connector_schema.format != old_format - || new_connector_schema.row_encode != old_row_encode - { + if new_format_encode.format != old_format || new_format_encode.row_encode != old_row_encode { bail_not_implemented!( "the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet", &old_format, @@ -153,19 +151,18 @@ pub fn check_format_encode( /// Refresh the source registry and get the added/dropped columns. pub async fn refresh_sr_and_get_columns_diff( original_source: &SourceCatalog, - connector_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, session: &Arc, ) -> Result<(StreamSourceInfo, Vec, Vec)> { let mut with_properties = original_source.with_properties.clone(); - validate_compatibility(connector_schema, &mut with_properties)?; + validate_compatibility(format_encode, &mut with_properties)?; if with_properties.is_cdc_connector() { bail_not_implemented!("altering a cdc source is not supported"); } let (Some(columns_from_resolve_source), source_info) = - bind_columns_from_source(session, connector_schema, Either::Right(&with_properties)) - .await? + bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await? else { // Source without schema registry is rejected. unreachable!("source without schema registry is rejected") @@ -189,18 +186,18 @@ pub async fn refresh_sr_and_get_columns_diff( Ok((source_info, added_columns, dropped_columns)) } -fn get_connector_schema_from_source(source: &SourceCatalog) -> Result { +fn get_format_encode_from_source(source: &SourceCatalog) -> Result { let [stmt]: [_; 1] = Parser::parse_sql(&source.definition) .context("unable to parse original source definition")? .try_into() .unwrap(); let Statement::CreateSource { - stmt: CreateSourceStatement { source_schema, .. }, + stmt: CreateSourceStatement { format_encode, .. }, } = stmt else { unreachable!() }; - Ok(source_schema.into_v2_with_warning()) + Ok(format_encode.into_v2_with_warning()) } pub async fn handler_refresh_schema( @@ -208,14 +205,14 @@ pub async fn handler_refresh_schema( name: ObjectName, ) -> Result { let (source, _, _) = fetch_source_catalog_with_db_schema_id(&handler_args.session, &name)?; - let connector_schema = get_connector_schema_from_source(&source)?; - handle_alter_source_with_sr(handler_args, name, connector_schema).await + let format_encode = get_format_encode_from_source(&source)?; + handle_alter_source_with_sr(handler_args, name, format_encode).await } pub async fn handle_alter_source_with_sr( handler_args: HandlerArgs, name: ObjectName, - connector_schema: ConnectorSchema, + format_encode: FormatEncodeOptions, ) -> Result { let session = handler_args.session; let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?; @@ -232,9 +229,9 @@ pub async fn handle_alter_source_with_sr( bail_not_implemented!(issue = 16003, "alter shared source"); } - check_format_encode(&source, &connector_schema)?; + check_format_encode(&source, &format_encode)?; - if !schema_has_schema_registry(&connector_schema) { + if !schema_has_schema_registry(&format_encode) { return Err(ErrorCode::NotSupported( "altering a source without schema registry".to_string(), "try `ALTER SOURCE .. ADD COLUMN ...` instead".to_string(), @@ -243,7 +240,7 @@ pub async fn handle_alter_source_with_sr( } let (source_info, added_columns, dropped_columns) = - refresh_sr_and_get_columns_diff(&source, &connector_schema, &session).await?; + refresh_sr_and_get_columns_diff(&source, &format_encode, &session).await?; if !dropped_columns.is_empty() { bail_not_implemented!( @@ -258,10 +255,10 @@ pub async fn handle_alter_source_with_sr( source.info = source_info; source.columns.extend(added_columns); source.definition = - alter_definition_format_encode(&source.definition, connector_schema.row_options.clone())?; + alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?; let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options( - WithOptions::try_from(connector_schema.row_options())?, + WithOptions::try_from(format_encode.row_options())?, session.as_ref(), )? .into_parts(); @@ -299,19 +296,19 @@ pub fn alter_definition_format_encode( match &mut stmt { Statement::CreateSource { - stmt: CreateSourceStatement { source_schema, .. }, + stmt: CreateSourceStatement { format_encode, .. }, } | Statement::CreateTable { - source_schema: Some(source_schema), + format_encode: Some(format_encode), .. } => { - match source_schema { - CompatibleSourceSchema::V2(schema) => { + match format_encode { + CompatibleFormatEncode::V2(schema) => { schema.row_options = format_encode_options; } // TODO: Confirm the behavior of legacy source schema. // Legacy source schema should be rejected by the handler and never reaches here. - CompatibleSourceSchema::RowFormat(_schema) => unreachable!(), + CompatibleFormatEncode::RowFormat(_schema) => unreachable!(), } } _ => unreachable!(), diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 88e886ad667bf..1241553aff04a 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -29,8 +29,8 @@ use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Encode, - ObjectName, Statement, StructField, + AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode, + FormatEncodeOptions, ObjectName, Statement, StructField, }; use risingwave_sqlparser::parser::Parser; @@ -51,14 +51,14 @@ pub async fn replace_table_with_definition( table_name: ObjectName, definition: Statement, original_catalog: &Arc, - source_schema: Option, + format_encode: Option, ) -> Result<()> { let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( session, table_name, definition, original_catalog, - source_schema, + format_encode, None, ) .await?; @@ -86,7 +86,7 @@ pub async fn get_new_table_definition_for_cdc_table( .unwrap(); let Statement::CreateTable { columns: original_columns, - source_schema, + format_encode, .. } = &mut definition else { @@ -94,7 +94,7 @@ pub async fn get_new_table_definition_for_cdc_table( }; assert!( - source_schema.is_none(), + format_encode.is_none(), "source schema should be None for CDC table" ); @@ -165,7 +165,7 @@ pub async fn get_replace_table_plan( table_name: ObjectName, definition: Statement, original_catalog: &Arc, - source_schema: Option, + format_encode: Option, new_version_columns: Option>, // only provided in auto schema change ) -> Result<( Option, @@ -196,7 +196,7 @@ pub async fn get_replace_table_plan( session, table_name, original_catalog, - source_schema, + format_encode, handler_args.clone(), col_id_gen, columns.clone(), @@ -326,19 +326,19 @@ pub async fn handle_alter_table_column( .unwrap(); let Statement::CreateTable { columns, - source_schema, + format_encode, .. } = &mut definition else { panic!("unexpected statement: {:?}", definition); }; - let source_schema = source_schema + let format_encode = format_encode .clone() - .map(|source_schema| source_schema.into_v2_with_warning()); + .map(|format_encode| format_encode.into_v2_with_warning()); let fail_if_has_schema_registry = || { - if let Some(source_schema) = &source_schema - && schema_has_schema_registry(source_schema) + if let Some(format_encode) = &format_encode + && schema_has_schema_registry(format_encode) { Err(ErrorCode::NotSupported( "alter table with schema registry".to_string(), @@ -460,14 +460,14 @@ pub async fn handle_alter_table_column( table_name, definition, &original_catalog, - source_schema, + format_encode, ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } -pub fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool { +pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool { match schema.row_encode { Encode::Avro | Encode::Protobuf => true, Encode::Json => { diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index d932246759e22..b5489b28b58f8 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -16,7 +16,7 @@ use anyhow::{anyhow, Context}; use fancy_regex::Regex; use pgwire::pg_response::StatementType; use risingwave_common::bail_not_implemented; -use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement}; +use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; @@ -29,15 +29,15 @@ use super::{HandlerArgs, RwPgResponse}; use crate::error::{ErrorCode, Result}; use crate::TableCatalog; -fn get_connector_schema_from_table(table: &TableCatalog) -> Result> { +fn get_format_encode_from_table(table: &TableCatalog) -> Result> { let [stmt]: [_; 1] = Parser::parse_sql(&table.definition) .context("unable to parse original table definition")? .try_into() .unwrap(); - let Statement::CreateTable { source_schema, .. } = stmt else { + let Statement::CreateTable { format_encode, .. } = stmt else { unreachable!() }; - Ok(source_schema.map(|schema| schema.into_v2_with_warning())) + Ok(format_encode.map(|schema| schema.into_v2_with_warning())) } pub async fn handle_refresh_schema( @@ -51,9 +51,9 @@ pub async fn handle_refresh_schema( bail_not_implemented!("alter table with incoming sinks"); } - let connector_schema = { - let connector_schema = get_connector_schema_from_table(&original_table)?; - if !connector_schema + let format_encode = { + let format_encode = get_format_encode_from_table(&original_table)?; + if !format_encode .as_ref() .is_some_and(schema_has_schema_registry) { @@ -63,12 +63,12 @@ pub async fn handle_refresh_schema( ) .into()); } - connector_schema.unwrap() + format_encode.unwrap() }; let definition = alter_definition_format_encode( &original_table.definition, - connector_schema.row_options.clone(), + format_encode.row_options.clone(), )?; let [definition]: [_; 1] = Parser::parse_sql(&definition) @@ -81,7 +81,7 @@ pub async fn handle_refresh_schema( table_name, definition, &original_table, - Some(connector_schema), + Some(format_encode), ) .await; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index fb35c5efc2e99..37db8d9541ad8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -38,7 +38,7 @@ use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode}; use risingwave_sqlparser::ast::{ - ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, + CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions, Query, Statement, }; use risingwave_sqlparser::parser::Parser; @@ -650,7 +650,7 @@ pub(crate) async fn reparse_table_for_sink( .unwrap(); let Statement::CreateTable { name, - source_schema, + format_encode, .. } = &definition else { @@ -658,9 +658,9 @@ pub(crate) async fn reparse_table_for_sink( }; let table_name = name.clone(); - let source_schema = source_schema + let format_encode = format_encode .clone() - .map(|source_schema| source_schema.into_v2_with_warning()); + .map(|format_encode| format_encode.into_v2_with_warning()); // Create handler args as if we're creating a new table with the altered definition. let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; @@ -683,7 +683,7 @@ pub(crate) async fn reparse_table_for_sink( session, table_name, table_catalog, - source_schema, + format_encode, handler_args, col_id_gen, columns, @@ -814,7 +814,10 @@ pub(crate) fn derive_default_column_project_for_sink( /// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`. /// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`] /// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`. -fn bind_sink_format_desc(session: &SessionImpl, value: ConnectorSchema) -> Result { +fn bind_sink_format_desc( + session: &SessionImpl, + value: FormatEncodeOptions, +) -> Result { use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat}; use risingwave_connector::sink::encoder::TimestamptzHandlingMode; use risingwave_sqlparser::ast::{Encode as E, Format as F}; @@ -929,7 +932,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock Result<()> { +pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> { let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS .get(connector) .ok_or_else(|| { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3616997d384cb..c315abb358aa5 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -62,8 +62,8 @@ use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, Wat use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_sqlparser::ast::{ - get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format, - ObjectName, ProtobufSchema, SourceWatermark, TableConstraint, + get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format, + FormatEncodeOptions, ObjectName, ProtobufSchema, SourceWatermark, TableConstraint, }; use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem}; use thiserror_ext::AsReport; @@ -299,7 +299,7 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result, ) -> Result<(Option>, StreamSourceInfo)> { const MESSAGE_NAME_KEY: &str = "message"; @@ -313,7 +313,7 @@ pub(crate) async fn bind_columns_from_source( let is_kafka: bool = options_with_secret.is_kafka_connector(); let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(source_schema.row_options())?, + WithOptions::try_from(format_encode.row_options())?, session, )? .into_parts(); @@ -345,18 +345,18 @@ pub(crate) async fn bind_columns_from_source( } let mut stream_source_info = StreamSourceInfo { - format: format_to_prost(&source_schema.format) as i32, - row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, + format: format_to_prost(&format_encode.format) as i32, + row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, format_encode_options, format_encode_secret_refs, ..Default::default() }; - if source_schema.format == Format::Debezium { + if format_encode.format == Format::Debezium { try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY); } - let columns = match (&source_schema.format, &source_schema.row_encode) { + let columns = match (&format_encode.format, &format_encode.row_encode) { (Format::Native, Encode::Native) | (Format::Plain, Encode::Bytes) | (Format::DebeziumMongo, Encode::Json) => None, @@ -476,7 +476,7 @@ pub(crate) async fn bind_columns_from_source( Encode::Json, ) => { if matches!( - source_schema.format, + format_encode.format, Format::Plain | Format::Upsert | Format::Debezium ) { // Parse the value but throw it away. @@ -524,8 +524,8 @@ pub(crate) async fn bind_columns_from_source( if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", - source_schema.format, - source_schema.row_encode, + format_encode.format, + format_encode.row_encode, format_encode_options_to_consume .keys() .map(|k| k.to_string()) @@ -539,10 +539,10 @@ pub(crate) async fn bind_columns_from_source( fn bind_columns_from_source_for_cdc( session: &SessionImpl, - source_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, ) -> Result<(Option>, StreamSourceInfo)> { let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(source_schema.row_options())?, + WithOptions::try_from(format_encode.row_options())?, session, )? .into_parts(); @@ -553,7 +553,7 @@ fn bind_columns_from_source_for_cdc( format_encode_secret_refs.clone(), )?; - match (&source_schema.format, &source_schema.row_encode) { + match (&format_encode.format, &format_encode.row_encode) { (Format::Plain, Encode::Json) => (), (format, encoding) => { // Note: parser will also check this. Just be extra safe here @@ -568,8 +568,8 @@ fn bind_columns_from_source_for_cdc( let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; let stream_source_info = StreamSourceInfo { - format: format_to_prost(&source_schema.format) as i32, - row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, + format: format_to_prost(&format_encode.format) as i32, + row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, format_encode_options, use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), cdc_source_job: true, @@ -580,8 +580,8 @@ fn bind_columns_from_source_for_cdc( if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", - source_schema.format, - source_schema.row_encode, + format_encode.format, + format_encode.row_encode, format_encode_options_to_consume .keys() .map(|k| k.to_string()) @@ -596,7 +596,7 @@ fn bind_columns_from_source_for_cdc( // check the additional column compatibility with the format and encode fn check_additional_column_compatibility( column_def: &IncludeOptionItem, - source_schema: Option<&ConnectorSchema>, + format_encode: Option<&FormatEncodeOptions>, ) -> Result<()> { // only allow header column have inner field if column_def.inner_field.is_some() @@ -612,7 +612,7 @@ fn check_additional_column_compatibility( } // Payload column only allowed when encode is JSON - if let Some(schema) = source_schema + if let Some(schema) = format_encode && column_def .column_type .real_value() @@ -629,7 +629,7 @@ fn check_additional_column_compatibility( /// add connector-spec columns to the end of column catalog pub fn handle_addition_columns( - source_schema: Option<&ConnectorSchema>, + format_encode: Option<&FormatEncodeOptions>, with_properties: &BTreeMap, mut additional_columns: IncludeOption, columns: &mut Vec, @@ -647,7 +647,7 @@ pub fn handle_addition_columns( } while let Some(item) = additional_columns.pop() { - check_additional_column_compatibility(&item, source_schema)?; + check_additional_column_compatibility(&item, format_encode)?; let data_type = item .header_inner_expect_type @@ -678,7 +678,7 @@ pub fn handle_addition_columns( /// Bind columns from both source and sql defined. pub(crate) fn bind_all_columns( - source_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, cols_from_source: Option>, cols_from_sql: Vec, col_defs_from_sql: &[ColumnDef], @@ -707,7 +707,7 @@ pub(crate) fn bind_all_columns( // TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209 Err(RwError::from(ProtocolError( format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \ - Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode)))) + Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", format_encode.format, format_encode.row_encode)))) } } else { if wildcard_idx.is_some() { @@ -717,7 +717,7 @@ pub(crate) fn bind_all_columns( ))); } let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql); - match (&source_schema.format, &source_schema.row_encode) { + match (&format_encode.format, &format_encode.row_encode) { (Format::DebeziumMongo, Encode::Json) => { let mut columns = vec![ ColumnCatalog { @@ -817,7 +817,7 @@ example: /// Bind column from source. Add key column to table columns if necessary. /// Return `pk_names`. pub(crate) async fn bind_source_pk( - source_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, source_info: &StreamSourceInfo, columns: &mut [ColumnCatalog], sql_defined_pk_names: Vec, @@ -849,7 +849,7 @@ pub(crate) async fn bind_source_pk( }) .collect_vec(); - let res = match (&source_schema.format, &source_schema.row_encode) { + let res = match (&format_encode.format, &format_encode.row_encode) { (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => { sql_defined_pk_names } @@ -1149,7 +1149,7 @@ pub fn validate_license(connector: &str) -> Result<()> { } pub fn validate_compatibility( - source_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, props: &mut BTreeMap, ) -> Result<()> { let mut connector = props @@ -1183,9 +1183,9 @@ pub fn validate_compatibility( validate_license(&connector)?; if connector != KAFKA_CONNECTOR { - let res = match (&source_schema.format, &source_schema.row_encode) { + let res = match (&format_encode.format, &format_encode.row_encode) { (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { - let mut options = WithOptions::try_from(source_schema.row_options())?; + let mut options = WithOptions::try_from(format_encode.row_options())?; let (_, use_schema_registry) = get_schema_location(options.inner_mut())?; use_schema_registry } @@ -1201,17 +1201,17 @@ pub fn validate_compatibility( } let compatible_encodes = compatible_formats - .get(&source_schema.format) + .get(&format_encode.format) .ok_or_else(|| { RwError::from(ProtocolError(format!( "connector {} does not support format {:?}", - connector, source_schema.format + connector, format_encode.format ))) })?; - if !compatible_encodes.contains(&source_schema.row_encode) { + if !compatible_encodes.contains(&format_encode.row_encode) { return Err(RwError::from(ProtocolError(format!( "connector {} does not support format {:?} with encode {:?}", - connector, source_schema.format, source_schema.row_encode + connector, format_encode.format, format_encode.row_encode )))); } @@ -1267,7 +1267,7 @@ pub fn validate_compatibility( /// /// One should only call this function after all properties of all columns are resolved, like /// generated column descriptors. -pub(super) async fn check_source_schema( +pub(super) async fn check_format_encode( props: &WithOptionsSecResolved, row_id_index: Option, columns: &[ColumnCatalog], @@ -1424,11 +1424,11 @@ pub async fn check_iceberg_source( pub fn bind_connector_props( handler_args: &HandlerArgs, - source_schema: &ConnectorSchema, + format_encode: &FormatEncodeOptions, is_create_source: bool, ) -> Result { let mut with_properties = handler_args.with_options.clone().into_connector_props(); - validate_compatibility(source_schema, &mut with_properties)?; + validate_compatibility(format_encode, &mut with_properties)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); if !is_create_source && with_properties.is_shareable_only_cdc_connector() { @@ -1472,7 +1472,7 @@ pub fn bind_connector_props( pub async fn bind_create_source_or_table_with_connector( handler_args: HandlerArgs, full_name: ObjectName, - source_schema: ConnectorSchema, + format_encode: FormatEncodeOptions, with_properties: WithOptions, sql_columns_defs: &[ColumnDef], constraints: Vec, @@ -1500,11 +1500,11 @@ pub async fn bind_create_source_or_table_with_connector( .into()); } if is_create_source { - match source_schema.format { + match format_encode.format { Format::Upsert => { return Err(ErrorCode::BindError(format!( "can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}", - hint_upsert(&source_schema.row_encode) + hint_upsert(&format_encode.row_encode) )) .into()); } @@ -1520,7 +1520,7 @@ pub async fn bind_create_source_or_table_with_connector( let columns_from_sql = bind_sql_columns(sql_columns_defs)?; let mut columns = bind_all_columns( - &source_schema, + &format_encode, columns_from_resolve_source, columns_from_sql, sql_columns_defs, @@ -1529,7 +1529,7 @@ pub async fn bind_create_source_or_table_with_connector( // add additional columns before bind pk, because `format upsert` requires the key column handle_addition_columns( - Some(&source_schema), + Some(&format_encode), &with_properties, include_column_options, &mut columns, @@ -1556,7 +1556,7 @@ pub async fn bind_create_source_or_table_with_connector( let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; let pk_names = bind_source_pk( - &source_schema, + &format_encode, &source_info, &mut columns, sql_pk_names, @@ -1605,7 +1605,7 @@ pub async fn bind_create_source_or_table_with_connector( sql_columns_defs.to_vec(), &pk_col_ids, )?; - check_source_schema(&with_properties, row_id_index, &columns).await?; + check_format_encode(&with_properties, row_id_index, &columns).await?; let definition = handler_args.normalized_sql.clone(); @@ -1659,8 +1659,8 @@ pub async fn handle_create_source( ))); } - let source_schema = stmt.source_schema.into_v2_with_warning(); - let with_properties = bind_connector_props(&handler_args, &source_schema, true)?; + let format_encode = stmt.format_encode.into_v2_with_warning(); + let with_properties = bind_connector_props(&handler_args, &format_encode, true)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); let is_shared = create_cdc_source_job @@ -1673,9 +1673,9 @@ pub async fn handle_create_source( && session.config().streaming_use_shared_source()); let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { - bind_columns_from_source_for_cdc(&session, &source_schema)? + bind_columns_from_source_for_cdc(&session, &format_encode)? } else { - bind_columns_from_source(&session, &source_schema, Either::Left(&with_properties)).await? + bind_columns_from_source(&session, &format_encode, Either::Left(&with_properties)).await? }; if is_shared { // Note: this field should be called is_shared. Check field doc for more details. @@ -1687,7 +1687,7 @@ pub async fn handle_create_source( let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), stmt.source_name, - source_schema, + format_encode, with_properties, &stmt.columns, stmt.constraints, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 2c1916174e0b7..eab38a44c4ff4 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -43,8 +43,8 @@ use risingwave_pb::plan_common::{ }; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, - ExplainOptions, Format, ObjectName, OnConflict, SourceWatermark, TableConstraint, + CdcTableInfo, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Format, + FormatEncodeOptions, ObjectName, OnConflict, SourceWatermark, TableConstraint, }; use risingwave_sqlparser::parser::IncludeOption; use thiserror_ext::AsReport; @@ -468,7 +468,7 @@ pub(crate) async fn gen_create_table_plan_with_source( column_defs: Vec, wildcard_idx: Option, constraints: Vec, - source_schema: ConnectorSchema, + format_encode: FormatEncodeOptions, source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, append_only: bool, @@ -477,28 +477,28 @@ pub(crate) async fn gen_create_table_plan_with_source( include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable)> { if append_only - && source_schema.format != Format::Plain - && source_schema.format != Format::Native + && format_encode.format != Format::Plain + && format_encode.format != Format::Native { return Err(ErrorCode::BindError(format!( "Append only table does not support format {}.", - source_schema.format + format_encode.format )) .into()); } let session = &handler_args.session; - let with_properties = bind_connector_props(&handler_args, &source_schema, false)?; + let with_properties = bind_connector_props(&handler_args, &format_encode, false)?; let (columns_from_resolve_source, source_info) = - bind_columns_from_source(session, &source_schema, Either::Left(&with_properties)).await?; + bind_columns_from_source(session, &format_encode, Either::Left(&with_properties)).await?; let overwrite_options = OverwriteOptions::new(&mut handler_args); let rate_limit = overwrite_options.source_rate_limit; let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), table_name, - source_schema, + format_encode, with_properties, &column_defs, constraints, @@ -940,7 +940,7 @@ fn derive_with_options_for_cdc_table( pub(super) async fn handle_create_table_plan( handler_args: HandlerArgs, explain_options: ExplainOptions, - source_schema: Option, + format_encode: Option, cdc_table_info: Option, table_name: ObjectName, column_defs: Vec, @@ -953,16 +953,16 @@ pub(super) async fn handle_create_table_plan( include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { let col_id_gen = ColumnIdGenerator::new_initial(); - let source_schema = check_create_table_with_source( + let format_encode = check_create_table_with_source( &handler_args.with_options, - source_schema, + format_encode, &include_column_options, &cdc_table_info, )?; let ((plan, source, table), job_type) = - match (source_schema, cdc_table_info.as_ref()) { - (Some(source_schema), None) => ( + match (format_encode, cdc_table_info.as_ref()) { + (Some(format_encode), None) => ( gen_create_table_plan_with_source( handler_args, explain_options, @@ -970,7 +970,7 @@ pub(super) async fn handle_create_table_plan( column_defs, wildcard_idx, constraints, - source_schema, + format_encode, source_watermarks, col_id_gen, append_only, @@ -1015,12 +1015,12 @@ pub(super) async fn handle_create_table_plan( session.get_database_and_schema_id_for_create(schema_name.clone())?; // cdc table cannot be append-only - let (source_schema, source_name) = + let (format_encode, source_name) = Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; let source = { let catalog_reader = session.env().catalog_reader().read_guard(); - let schema_name = source_schema + let schema_name = format_encode .clone() .unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); let (source, _) = catalog_reader.get_source_by_name( @@ -1235,7 +1235,7 @@ pub async fn handle_create_table( wildcard_idx: Option, constraints: Vec, if_not_exists: bool, - source_schema: Option, + format_encode: Option, source_watermarks: Vec, append_only: bool, on_conflict: Option, @@ -1263,7 +1263,7 @@ pub async fn handle_create_table( let (plan, source, table, job_type) = handle_create_table_plan( handler_args, ExplainOptions::default(), - source_schema, + format_encode, cdc_table_info, table_name.clone(), column_defs, @@ -1298,13 +1298,13 @@ pub async fn handle_create_table( pub fn check_create_table_with_source( with_options: &WithOptions, - source_schema: Option, + format_encode: Option, include_column_options: &IncludeOption, cdc_table_info: &Option, -) -> Result> { +) -> Result> { // skip check for cdc table if cdc_table_info.is_some() { - return Ok(source_schema); + return Ok(format_encode); } let defined_source = with_options.contains_key(UPSTREAM_SOURCE_KEY); if !include_column_options.is_empty() && !defined_source { @@ -1314,11 +1314,11 @@ pub fn check_create_table_with_source( .into()); } if defined_source { - source_schema.as_ref().ok_or_else(|| { + format_encode.as_ref().ok_or_else(|| { ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned()) })?; } - Ok(source_schema) + Ok(format_encode) } #[allow(clippy::too_many_arguments)] @@ -1326,7 +1326,7 @@ pub async fn generate_stream_graph_for_table( _session: &Arc, table_name: ObjectName, original_catalog: &Arc, - source_schema: Option, + format_encode: Option, handler_args: HandlerArgs, col_id_gen: ColumnIdGenerator, column_defs: Vec, @@ -1341,8 +1341,8 @@ pub async fn generate_stream_graph_for_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let ((plan, source, table), job_type) = match (source_schema, cdc_table_info.as_ref()) { - (Some(source_schema), None) => ( + let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { + (Some(format_encode), None) => ( gen_create_table_plan_with_source( handler_args, ExplainOptions::default(), @@ -1350,7 +1350,7 @@ pub async fn generate_stream_graph_for_table( column_defs, wildcard_idx, constraints, - source_schema, + format_encode, source_watermarks, col_id_gen, append_only, @@ -1463,12 +1463,12 @@ fn get_source_and_resolved_table_name( let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name.clone())?; - let (source_schema, source_name) = + let (format_encode, source_name) = Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; let source = { let catalog_reader = session.env().catalog_reader().read_guard(); - let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); let (source, _) = catalog_reader.get_source_by_name( db_name, SchemaPath::Name(schema_name.as_str()), diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index f9bcc19379256..1740c161c3fbe 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -57,7 +57,7 @@ async fn do_handle_explain( name, columns, constraints, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -67,12 +67,12 @@ async fn do_handle_explain( wildcard_idx, .. } => { - let source_schema = source_schema.map(|s| s.into_v2_with_warning()); + let format_encode = format_encode.map(|s| s.into_v2_with_warning()); let (plan, _source, _table, _job_type) = handle_create_table_plan( handler_args, explain_options, - source_schema, + format_encode, cdc_table_info, name.clone(), columns, diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index e0bd5a5efae2e..d9a190ca319e7 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -336,7 +336,7 @@ pub async fn handle( or_replace, temporary, if_not_exists, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -363,7 +363,7 @@ pub async fn handle( ) .await; } - let source_schema = source_schema.map(|s| s.into_v2_with_warning()); + let format_encode = format_encode.map(|s| s.into_v2_with_warning()); create_table::handle_create_table( handler_args, name, @@ -371,7 +371,7 @@ pub async fn handle( wildcard_idx, constraints, if_not_exists, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -948,9 +948,9 @@ pub async fn handle( } Statement::AlterSource { name, - operation: AlterSourceOperation::FormatEncode { connector_schema }, + operation: AlterSourceOperation::FormatEncode { format_encode }, } => { - alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, connector_schema) + alter_source_with_sr::handle_alter_source_with_sr(handler_args, name, format_encode) .await } Statement::AlterSource { diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 9ff2cc92b5525..169716cd504a2 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -35,8 +35,8 @@ use risingwave_common::types::{ use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{ - CompatibleSourceSchema, ConnectorSchema, Expr, Ident, ObjectName, OrderByExpr, Query, Select, - SelectItem, SetExpr, TableFactor, TableWithJoins, + CompatibleFormatEncode, Expr, FormatEncodeOptions, Ident, ObjectName, OrderByExpr, Query, + Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; use thiserror_ext::AsReport; @@ -194,16 +194,16 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { } #[easy_ext::ext(SourceSchemaCompatExt)] -impl CompatibleSourceSchema { - /// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated. - pub fn into_v2_with_warning(self) -> ConnectorSchema { +impl CompatibleFormatEncode { + /// Convert `self` to [`FormatEncodeOptions`] and warn the user if the syntax is deprecated. + pub fn into_v2_with_warning(self) -> FormatEncodeOptions { match self { - CompatibleSourceSchema::RowFormat(inner) => { + CompatibleFormatEncode::RowFormat(inner) => { // TODO: should be warning current::notice_to_user("RisingWave will stop supporting the syntax \"ROW FORMAT\" in future versions, which will be changed to \"FORMAT ... ENCODE ...\" syntax."); - inner.into_source_schema_v2() + inner.into_format_encode_v2() } - CompatibleSourceSchema::V2(inner) => inner, + CompatibleFormatEncode::V2(inner) => inner, } } } diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index de8f983056a6a..f2d2cd58dd494 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -28,7 +28,7 @@ use risingwave_pb::meta::event_log::Event; use risingwave_pb::meta::EventLog; use risingwave_pb::monitor_service::StackTraceResponse; use risingwave_rpc_client::ComputeClientPool; -use risingwave_sqlparser::ast::{CompatibleSourceSchema, Statement, Value}; +use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value}; use risingwave_sqlparser::parser::Parser; use serde_json::json; use thiserror_ext::AsReport; @@ -731,28 +731,28 @@ fn redact_all_sql_options(sql: &str) -> Option { let options = match statement { Statement::CreateTable { with_options, - source_schema, + format_encode, .. } => { - let connector_schema = match source_schema { - Some(CompatibleSourceSchema::V2(cs)) => Some(&mut cs.row_options), + let format_encode = match format_encode { + Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options), _ => None, }; - (Some(with_options), connector_schema) + (Some(with_options), format_encode) } Statement::CreateSource { stmt } => { - let connector_schema = match &mut stmt.source_schema { - CompatibleSourceSchema::V2(cs) => Some(&mut cs.row_options), + let format_encode = match &mut stmt.format_encode { + CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options), _ => None, }; - (Some(&mut stmt.with_properties.0), connector_schema) + (Some(&mut stmt.with_properties.0), format_encode) } Statement::CreateSink { stmt } => { - let connector_schema = match &mut stmt.sink_schema { + let format_encode = match &mut stmt.sink_schema { Some(cs) => Some(&mut cs.row_options), _ => None, }; - (Some(&mut stmt.with_properties.0), connector_schema) + (Some(&mut stmt.with_properties.0), format_encode) } _ => (None, None), }; diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 89e8f24bf5922..451e8ddf99908 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -20,7 +20,7 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::ConnectorSchema; +use super::FormatEncodeOptions; use crate::ast::{ display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue, }; @@ -182,7 +182,7 @@ pub enum AlterSourceOperation { AddColumn { column_def: ColumnDef }, ChangeOwner { new_owner_name: Ident }, SetSchema { new_schema_name: ObjectName }, - FormatEncode { connector_schema: ConnectorSchema }, + FormatEncode { format_encode: FormatEncodeOptions }, RefreshSchema, SetSourceRateLimit { rate_limit: i32 }, } @@ -413,8 +413,8 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterSourceOperation::FormatEncode { connector_schema } => { - write!(f, "{connector_schema}") + AlterSourceOperation::FormatEncode { format_encode } => { + write!(f, "{format_encode}") } AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") diff --git a/src/sqlparser/src/ast/legacy_source.rs b/src/sqlparser/src/ast/legacy_source.rs index 5fb2e233a67df..7a5abf35a2df8 100644 --- a/src/sqlparser/src/ast/legacy_source.rs +++ b/src/sqlparser/src/ast/legacy_source.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use winnow::PResult; use crate::ast::{ - display_separated, AstString, ConnectorSchema, Encode, Format, Ident, ObjectName, ParseTo, + display_separated, AstString, Encode, Format, FormatEncodeOptions, Ident, ObjectName, ParseTo, SqlOption, Value, }; use crate::keywords::Keyword; @@ -32,45 +32,45 @@ use crate::{impl_fmt_display, impl_parse_to, parser_err}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum CompatibleSourceSchema { - RowFormat(SourceSchema), - V2(ConnectorSchema), +pub enum CompatibleFormatEncode { + RowFormat(LegacyRowFormat), + V2(FormatEncodeOptions), } -impl fmt::Display for CompatibleSourceSchema { +impl fmt::Display for CompatibleFormatEncode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - CompatibleSourceSchema::RowFormat(inner) => { + CompatibleFormatEncode::RowFormat(inner) => { write!(f, "{}", inner) } - CompatibleSourceSchema::V2(inner) => { + CompatibleFormatEncode::V2(inner) => { write!(f, "{}", inner) } } } } -impl CompatibleSourceSchema { - pub(crate) fn into_v2(self) -> ConnectorSchema { +impl CompatibleFormatEncode { + pub(crate) fn into_v2(self) -> FormatEncodeOptions { match self { - CompatibleSourceSchema::RowFormat(inner) => inner.into_source_schema_v2(), - CompatibleSourceSchema::V2(inner) => inner, + CompatibleFormatEncode::RowFormat(inner) => inner.into_format_encode_v2(), + CompatibleFormatEncode::V2(inner) => inner, } } } -impl From for CompatibleSourceSchema { - fn from(value: ConnectorSchema) -> Self { +impl From for CompatibleFormatEncode { + fn from(value: FormatEncodeOptions) -> Self { Self::V2(value) } } -pub fn parse_source_schema(p: &mut Parser<'_>) -> PResult { +pub fn parse_format_encode(p: &mut Parser<'_>) -> PResult { if let Some(schema_v2) = p.parse_schema()? { if schema_v2.key_encode.is_some() { parser_err!("key encode clause is not supported in source schema"); } - Ok(CompatibleSourceSchema::V2(schema_v2)) + Ok(CompatibleFormatEncode::V2(schema_v2)) } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW]) && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT]) { @@ -79,34 +79,34 @@ pub fn parse_source_schema(p: &mut Parser<'_>) -> PResult SourceSchema::Json, - "UPSERT_JSON" => SourceSchema::UpsertJson, + "JSON" => LegacyRowFormat::Json, + "UPSERT_JSON" => LegacyRowFormat::UpsertJson, "PROTOBUF" => { impl_parse_to!(protobuf_schema: ProtobufSchema, p); - SourceSchema::Protobuf(protobuf_schema) + LegacyRowFormat::Protobuf(protobuf_schema) } - "DEBEZIUM_JSON" => SourceSchema::DebeziumJson, - "DEBEZIUM_MONGO_JSON" => SourceSchema::DebeziumMongoJson, + "DEBEZIUM_JSON" => LegacyRowFormat::DebeziumJson, + "DEBEZIUM_MONGO_JSON" => LegacyRowFormat::DebeziumMongoJson, "AVRO" => { impl_parse_to!(avro_schema: AvroSchema, p); - SourceSchema::Avro(avro_schema) + LegacyRowFormat::Avro(avro_schema) } "UPSERT_AVRO" => { impl_parse_to!(avro_schema: AvroSchema, p); - SourceSchema::UpsertAvro(avro_schema) + LegacyRowFormat::UpsertAvro(avro_schema) } - "MAXWELL" => SourceSchema::Maxwell, - "CANAL_JSON" => SourceSchema::CanalJson, + "MAXWELL" => LegacyRowFormat::Maxwell, + "CANAL_JSON" => LegacyRowFormat::CanalJson, "CSV" => { impl_parse_to!(csv_info: CsvInfo, p); - SourceSchema::Csv(csv_info) + LegacyRowFormat::Csv(csv_info) } - "NATIVE" => SourceSchema::Native, // used internally by schema change + "NATIVE" => LegacyRowFormat::Native, // used internally by schema change "DEBEZIUM_AVRO" => { impl_parse_to!(avro_schema: DebeziumAvroSchema, p); - SourceSchema::DebeziumAvro(avro_schema) + LegacyRowFormat::DebeziumAvro(avro_schema) } - "BYTES" => SourceSchema::Bytes, + "BYTES" => LegacyRowFormat::Bytes, _ => { parser_err!( "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO \ @@ -114,7 +114,7 @@ pub fn parse_source_schema(p: &mut Parser<'_>) -> PResult) -> PResult ConnectorSchema { +impl LegacyRowFormat { + pub fn into_format_encode_v2(self) -> FormatEncodeOptions { let (format, row_encode) = match self { - SourceSchema::Protobuf(_) => (Format::Plain, Encode::Protobuf), - SourceSchema::Json => (Format::Plain, Encode::Json), - SourceSchema::DebeziumJson => (Format::Debezium, Encode::Json), - SourceSchema::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json), - SourceSchema::UpsertJson => (Format::Upsert, Encode::Json), - SourceSchema::Avro(_) => (Format::Plain, Encode::Avro), - SourceSchema::UpsertAvro(_) => (Format::Upsert, Encode::Avro), - SourceSchema::Maxwell => (Format::Maxwell, Encode::Json), - SourceSchema::CanalJson => (Format::Canal, Encode::Json), - SourceSchema::Csv(_) => (Format::Plain, Encode::Csv), - SourceSchema::DebeziumAvro(_) => (Format::Debezium, Encode::Avro), - SourceSchema::Bytes => (Format::Plain, Encode::Bytes), - SourceSchema::Native => (Format::Native, Encode::Native), + LegacyRowFormat::Protobuf(_) => (Format::Plain, Encode::Protobuf), + LegacyRowFormat::Json => (Format::Plain, Encode::Json), + LegacyRowFormat::DebeziumJson => (Format::Debezium, Encode::Json), + LegacyRowFormat::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json), + LegacyRowFormat::UpsertJson => (Format::Upsert, Encode::Json), + LegacyRowFormat::Avro(_) => (Format::Plain, Encode::Avro), + LegacyRowFormat::UpsertAvro(_) => (Format::Upsert, Encode::Avro), + LegacyRowFormat::Maxwell => (Format::Maxwell, Encode::Json), + LegacyRowFormat::CanalJson => (Format::Canal, Encode::Json), + LegacyRowFormat::Csv(_) => (Format::Plain, Encode::Csv), + LegacyRowFormat::DebeziumAvro(_) => (Format::Debezium, Encode::Avro), + LegacyRowFormat::Bytes => (Format::Plain, Encode::Bytes), + LegacyRowFormat::Native => (Format::Native, Encode::Native), }; let row_options = match self { - SourceSchema::Protobuf(schema) => { + LegacyRowFormat::Protobuf(schema) => { let mut options = vec![SqlOption { name: ObjectName(vec![Ident { value: "message".into(), @@ -184,7 +184,7 @@ impl SourceSchema { } options } - SourceSchema::Avro(schema) | SourceSchema::UpsertAvro(schema) => { + LegacyRowFormat::Avro(schema) | LegacyRowFormat::UpsertAvro(schema) => { if schema.use_schema_registry { vec![SqlOption { name: ObjectName(vec![Ident { @@ -203,7 +203,7 @@ impl SourceSchema { }] } } - SourceSchema::DebeziumAvro(schema) => { + LegacyRowFormat::DebeziumAvro(schema) => { vec![SqlOption { name: ObjectName(vec![Ident { value: "schema.registry".into(), @@ -212,7 +212,7 @@ impl SourceSchema { value: Value::SingleQuotedString(schema.row_schema_location.0), }] } - SourceSchema::Csv(schema) => { + LegacyRowFormat::Csv(schema) => { vec![ SqlOption { name: ObjectName(vec![Ident { @@ -239,7 +239,7 @@ impl SourceSchema { _ => vec![], }; - ConnectorSchema { + FormatEncodeOptions { format, row_encode, row_options, @@ -248,23 +248,27 @@ impl SourceSchema { } } -impl fmt::Display for SourceSchema { +impl fmt::Display for LegacyRowFormat { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "ROW FORMAT ")?; match self { - SourceSchema::Protobuf(protobuf_schema) => write!(f, "PROTOBUF {}", protobuf_schema), - SourceSchema::Json => write!(f, "JSON"), - SourceSchema::UpsertJson => write!(f, "UPSERT_JSON"), - SourceSchema::Maxwell => write!(f, "MAXWELL"), - SourceSchema::DebeziumJson => write!(f, "DEBEZIUM_JSON"), - SourceSchema::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"), - SourceSchema::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema), - SourceSchema::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema), - SourceSchema::CanalJson => write!(f, "CANAL_JSON"), - SourceSchema::Csv(csv_info) => write!(f, "CSV {}", csv_info), - SourceSchema::Native => write!(f, "NATIVE"), - SourceSchema::DebeziumAvro(avro_schema) => write!(f, "DEBEZIUM_AVRO {}", avro_schema), - SourceSchema::Bytes => write!(f, "BYTES"), + LegacyRowFormat::Protobuf(protobuf_schema) => { + write!(f, "PROTOBUF {}", protobuf_schema) + } + LegacyRowFormat::Json => write!(f, "JSON"), + LegacyRowFormat::UpsertJson => write!(f, "UPSERT_JSON"), + LegacyRowFormat::Maxwell => write!(f, "MAXWELL"), + LegacyRowFormat::DebeziumJson => write!(f, "DEBEZIUM_JSON"), + LegacyRowFormat::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"), + LegacyRowFormat::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema), + LegacyRowFormat::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema), + LegacyRowFormat::CanalJson => write!(f, "CANAL_JSON"), + LegacyRowFormat::Csv(csv_info) => write!(f, "CSV {}", csv_info), + LegacyRowFormat::Native => write!(f, "NATIVE"), + LegacyRowFormat::DebeziumAvro(avro_schema) => { + write!(f, "DEBEZIUM_AVRO {}", avro_schema) + } + LegacyRowFormat::Bytes => write!(f, "BYTES"), } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 5dcea9c339d87..1523a1bbb2e1d 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -42,7 +42,7 @@ pub use self::ddl::{ ReferentialAction, SourceWatermark, TableConstraint, }; pub use self::legacy_source::{ - get_delimiter, AvroSchema, CompatibleSourceSchema, DebeziumAvroSchema, ProtobufSchema, + get_delimiter, AvroSchema, CompatibleFormatEncode, DebeziumAvroSchema, ProtobufSchema, }; pub use self::operator::{BinaryOperator, QualifiedOperator, UnaryOperator}; pub use self::query::{ @@ -1286,8 +1286,8 @@ pub enum Statement { wildcard_idx: Option, constraints: Vec, with_options: Vec, - /// Optional schema of the external source with which the table is created - source_schema: Option, + /// `FORMAT ... ENCODE ...` for table with connector + format_encode: Option, /// The watermark defined on source. source_watermarks: Vec, /// Append only table. @@ -1827,7 +1827,7 @@ impl fmt::Display for Statement { or_replace, if_not_exists, temporary, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -1874,8 +1874,8 @@ impl fmt::Display for Statement { if !with_options.is_empty() { write!(f, " WITH ({})", display_comma_separated(with_options))?; } - if let Some(source_schema) = source_schema { - write!(f, " {}", source_schema)?; + if let Some(format_encode) = format_encode { + write!(f, " {}", format_encode)?; } if let Some(query) = query { write!(f, " AS {}", query)?; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index a0f919091b56d..72680161defea 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize}; use winnow::PResult; use super::ddl::SourceWatermark; -use super::legacy_source::{parse_source_schema, CompatibleSourceSchema}; +use super::legacy_source::{parse_format_encode, CompatibleFormatEncode}; use super::{EmitMode, Ident, ObjectType, Query, Value}; use crate::ast::{ display_comma_separated, display_separated, ColumnDef, ObjectName, SqlOption, TableConstraint, @@ -76,7 +76,7 @@ macro_rules! impl_fmt_display { // source_name: Ident, // with_properties: AstOption, // [Keyword::ROW, Keyword::FORMAT], -// source_schema: SourceSchema, +// format_encode: SourceSchema, // [Keyword::WATERMARK, Keyword::FOR] column [Keyword::AS] // }); #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -90,7 +90,7 @@ pub struct CreateSourceStatement { pub constraints: Vec, pub source_name: ObjectName, pub with_properties: WithProperties, - pub source_schema: CompatibleSourceSchema, + pub format_encode: CompatibleFormatEncode, pub source_watermarks: Vec, pub include_column_options: IncludeOption, } @@ -222,7 +222,7 @@ impl Encode { /// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]` #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct ConnectorSchema { +pub struct FormatEncodeOptions { pub format: Format, pub row_encode: Encode, pub row_options: Vec, @@ -232,33 +232,33 @@ pub struct ConnectorSchema { impl Parser<'_> { /// Peek the next tokens to see if it is `FORMAT` or `ROW FORMAT` (for compatibility). - fn peek_source_schema_format(&mut self) -> bool { + fn peek_format_encode_format(&mut self) -> bool { (self.peek_nth_any_of_keywords(0, &[Keyword::ROW]) && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) // ROW FORMAT || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) // FORMAT } /// Parse the source schema. The behavior depends on the `connector` type. - pub fn parse_source_schema_with_connector( + pub fn parse_format_encode_with_connector( &mut self, connector: &str, cdc_source_job: bool, - ) -> PResult { + ) -> PResult { // row format for cdc source must be debezium json // row format for nexmark source must be native // default row format for datagen source is native // FIXME: parse input `connector` to enum type instead using string here if connector.contains("-cdc") { let expected = if cdc_source_job { - ConnectorSchema::plain_json() + FormatEncodeOptions::plain_json() } else if connector.contains("mongodb") { - ConnectorSchema::debezium_mongo_json() + FormatEncodeOptions::debezium_mongo_json() } else { - ConnectorSchema::debezium_json() + FormatEncodeOptions::debezium_json() }; - if self.peek_source_schema_format() { - let schema = parse_source_schema(self)?.into_v2(); + if self.peek_format_encode_format() { + let schema = parse_format_encode(self)?.into_v2(); if schema != expected { parser_err!( "Row format for CDC connectors should be \ @@ -268,9 +268,9 @@ impl Parser<'_> { } Ok(expected.into()) } else if connector.contains("nexmark") { - let expected = ConnectorSchema::native(); - if self.peek_source_schema_format() { - let schema = parse_source_schema(self)?.into_v2(); + let expected = FormatEncodeOptions::native(); + if self.peek_format_encode_format() { + let schema = parse_format_encode(self)?.into_v2(); if schema != expected { parser_err!( "Row format for nexmark connectors should be \ @@ -280,15 +280,15 @@ impl Parser<'_> { } Ok(expected.into()) } else if connector.contains("datagen") { - Ok(if self.peek_source_schema_format() { - parse_source_schema(self)? + Ok(if self.peek_format_encode_format() { + parse_format_encode(self)? } else { - ConnectorSchema::native().into() + FormatEncodeOptions::native().into() }) } else if connector.contains("iceberg") { - let expected = ConnectorSchema::none(); - if self.peek_source_schema_format() { - let schema = parse_source_schema(self)?.into_v2(); + let expected = FormatEncodeOptions::none(); + if self.peek_format_encode_format() { + let schema = parse_format_encode(self)?.into_v2(); if schema != expected { parser_err!( "Row format for iceberg connectors should be \ @@ -298,12 +298,12 @@ impl Parser<'_> { } Ok(expected.into()) } else { - Ok(parse_source_schema(self)?) + Ok(parse_format_encode(self)?) } } /// Parse `FORMAT ... ENCODE ... (...)`. - pub fn parse_schema(&mut self) -> PResult> { + pub fn parse_schema(&mut self) -> PResult> { if !self.parse_keyword(Keyword::FORMAT) { return Ok(None); } @@ -325,7 +325,7 @@ impl Parser<'_> { None }; - Ok(Some(ConnectorSchema { + Ok(Some(FormatEncodeOptions { format, row_encode, row_options, @@ -334,9 +334,9 @@ impl Parser<'_> { } } -impl ConnectorSchema { +impl FormatEncodeOptions { pub const fn plain_json() -> Self { - ConnectorSchema { + FormatEncodeOptions { format: Format::Plain, row_encode: Encode::Json, row_options: Vec::new(), @@ -346,7 +346,7 @@ impl ConnectorSchema { /// Create a new source schema with `Debezium` format and `Json` encoding. pub const fn debezium_json() -> Self { - ConnectorSchema { + FormatEncodeOptions { format: Format::Debezium, row_encode: Encode::Json, row_options: Vec::new(), @@ -355,7 +355,7 @@ impl ConnectorSchema { } pub const fn debezium_mongo_json() -> Self { - ConnectorSchema { + FormatEncodeOptions { format: Format::DebeziumMongo, row_encode: Encode::Json, row_options: Vec::new(), @@ -365,7 +365,7 @@ impl ConnectorSchema { /// Create a new source schema with `Native` format and encoding. pub const fn native() -> Self { - ConnectorSchema { + FormatEncodeOptions { format: Format::Native, row_encode: Encode::Native, row_options: Vec::new(), @@ -376,7 +376,7 @@ impl ConnectorSchema { /// Create a new source schema with `None` format and encoding. /// Used for self-explanatory source like iceberg. pub const fn none() -> Self { - ConnectorSchema { + FormatEncodeOptions { format: Format::None, row_encode: Encode::None, row_options: Vec::new(), @@ -389,7 +389,7 @@ impl ConnectorSchema { } } -impl fmt::Display for ConnectorSchema { +impl fmt::Display for FormatEncodeOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?; @@ -467,7 +467,7 @@ impl fmt::Display for CreateSourceStatement { v.push(format!("{}", item)); } impl_fmt_display!(with_properties, v, self); - impl_fmt_display!(source_schema, v, self); + impl_fmt_display!(format_encode, v, self); v.iter().join(" ").fmt(f) } } @@ -503,7 +503,7 @@ pub struct CreateSinkStatement { pub sink_from: CreateSink, pub columns: Vec, pub emit_mode: Option, - pub sink_schema: Option, + pub sink_schema: Option, pub into_table_name: Option, } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 4874e5320056d..b383869c0d3d0 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2143,7 +2143,7 @@ impl Parser<'_> { // row format for nexmark source must be native // default row format for datagen source is native - let source_schema = self.parse_source_schema_with_connector(&connector, cdc_source_job)?; + let format_encode = self.parse_format_encode_with_connector(&connector, cdc_source_job)?; let stmt = CreateSourceStatement { temporary, @@ -2153,7 +2153,7 @@ impl Parser<'_> { constraints, source_name, with_properties: WithProperties(with_options), - source_schema, + format_encode, source_watermarks, include_column_options: include_options, }; @@ -2585,8 +2585,8 @@ impl Parser<'_> { .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY); let connector = option.map(|opt| opt.value.to_string()); - let source_schema = if let Some(connector) = connector { - Some(self.parse_source_schema_with_connector(&connector, false)?) + let format_encode = if let Some(connector) = connector { + Some(self.parse_format_encode_with_connector(&connector, false)?) } else { None // Table is NOT created with an external connector. }; @@ -2621,7 +2621,7 @@ impl Parser<'_> { with_options, or_replace, if_not_exists, - source_schema, + format_encode, source_watermarks, append_only, on_conflict, @@ -3475,11 +3475,11 @@ impl Parser<'_> { return self.expected("SCHEMA after SET"); } } else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) { - let connector_schema = self.parse_schema()?.unwrap(); - if connector_schema.key_encode.is_some() { + let format_encode = self.parse_schema()?.unwrap(); + if format_encode.key_encode.is_some() { parser_err!("key encode clause is not supported in source schema"); } - AlterSourceOperation::FormatEncode { connector_schema } + AlterSourceOperation::FormatEncode { format_encode } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterSourceOperation::RefreshSchema } else { diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 1c1f80818478b..8390cc980cc25 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -43,19 +43,19 @@ ^ - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src (*, WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: Some(0), constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: Some(0), constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src (PRIMARY KEY (event_id), WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND) WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [Unique { name: None, columns: [Ident { value: "event_id", quote_style: None }], is_primary: true }], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [Unique { name: None, columns: [Ident { value: "event_id", quote_style: None }], is_primary: true }], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), format_encode: V2(FormatEncodeOptions { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "event_time", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "event_time", quote_style: None }), op: Minus, right: Value(Interval { value: "60", leading_field: Some(Second), leading_precision: None, last_field: None, fractional_seconds_precision: None }) } }], include_column_options: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), format_encode: V2(FormatEncodeOptions { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' - input: |- CREATE SOURCE s (raw BYTEA) diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 1c31041a7e484..cc16414f73cef 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -276,7 +276,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -324,7 +324,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -383,7 +383,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -518,7 +518,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -573,7 +573,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -635,7 +635,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None, @@ -715,7 +715,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY wildcard_idx: None, constraints: [], with_options: [], - source_schema: None, + format_encode: None, source_watermarks: [], append_only: false, on_conflict: None,