From 7b3cfe0ce0e606644a2b8fd1de0d13673a13216f Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Thu, 9 May 2024 00:50:31 +0800 Subject: [PATCH] feat: impl `key encode` clause for sink (#16377) Signed-off-by: tabVersion --- ci/scripts/e2e-kafka-sink-test.sh | 1 + e2e_test/sink/kafka/create_sink.slt | 74 +++++++++- e2e_test/sink/kafka/drop_sink.slt | 3 + proto/catalog.proto | 1 + proto/plan_common.proto | 1 + src/bench/sink_bench/main.rs | 1 + src/connector/src/sink/catalog/mod.rs | 31 +++- src/connector/src/sink/encoder/mod.rs | 1 + src/connector/src/sink/encoder/text.rs | 108 ++++++++++++++ src/connector/src/sink/formatter/mod.rs | 139 +++++++++++++++--- src/connector/src/sink/redis.rs | 23 ++- .../src/handler/alter_source_with_sr.rs | 1 + src/frontend/src/handler/create_sink.rs | 16 +- src/frontend/src/handler/create_source.rs | 1 + .../src/optimizer/plan_node/stream_sink.rs | 27 ++-- src/sqlparser/src/ast/legacy_source.rs | 16 +- src/sqlparser/src/ast/statement.rs | 48 ++++-- src/sqlparser/src/parser.rs | 9 +- src/sqlparser/tests/testdata/create.yaml | 6 +- 19 files changed, 446 insertions(+), 61 deletions(-) create mode 100644 src/connector/src/sink/encoder/text.rs diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 1dd7a27831d49..559da130b7b61 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -8,6 +8,7 @@ set -euo pipefail ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 1cdb4771ffd47..c3f738fbce2ae 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -183,6 +183,61 @@ create sink invalid_pk_column from t_kafka with ( primary_key = 'id,invalid' ); +### Test sink with key encode ### + +statement error sink key encode unsupported: JSON, only TEXT supported +create sink sink_text_error from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-text-key-id', + primary_key = 'id') +format plain encode json ( + force_append_only='true' +) key encode json ; + +statement error +# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s +create sink sink_text_error from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-text-key-id', + primary_key = 'id, v_varchar') +format plain encode json ( + force_append_only='true' +) key encode text ; + +statement error +# The key encode is TEXT, but the primary key column v_bytea has type bytea. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, or big int. +create sink sink_text_error from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-text-key-id', + primary_key = 'v_bytea') +format plain encode json ( + force_append_only='true' +) key encode text ; + +statement ok +create sink sink_text_id from t_kafka with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-text-key-id', + primary_key = 'id') +format plain encode json ( + force_append_only='true' +) key encode text ; + +statement ok +create table t_sink_text_id (id int) +include key as rw_key +with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-text-key-id', +) format plain encode json; + +#====== + statement ok insert into t_kafka values (1, '8DfUFencLe', 31031, 1872, 1872, 26261.416, 23956.39329760601, '2023-04-14 06:27:14.104742', '\x00', '0 second', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), @@ -193,7 +248,6 @@ insert into t_kafka values (6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'); - statement error create sink si_kafka_without_snapshot as select * from t_kafka with ( connector = 'kafka', @@ -216,8 +270,24 @@ create sink si_kafka_without_snapshot from t_kafka with ( snapshot = 'false', ); +sleep 1s + +query T +select rw_key from t_sink_text_id order by rw_key +---- +\x31 +\x32 +\x33 +\x34 +\x35 +\x36 +\x37 + statement ok insert into t_kafka values (8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), - (10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}'); \ No newline at end of file + (10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}'); + +statement ok +drop table t_sink_text_id; \ No newline at end of file diff --git a/e2e_test/sink/kafka/drop_sink.slt b/e2e_test/sink/kafka/drop_sink.slt index db599e80ec6d1..7bd370bcb1264 100644 --- a/e2e_test/sink/kafka/drop_sink.slt +++ b/e2e_test/sink/kafka/drop_sink.slt @@ -13,5 +13,8 @@ drop sink si_kafka_upsert_schema; statement ok drop sink si_kafka_without_snapshot; +statement ok +drop sink sink_text_id; + statement ok drop table t_kafka; diff --git a/proto/catalog.proto b/proto/catalog.proto index 47b5d719d15cb..42b672a1b6c51 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -134,6 +134,7 @@ message SinkFormatDesc { plan_common.FormatType format = 1; plan_common.EncodeType encode = 2; map options = 3; + optional plan_common.EncodeType key_encode = 4; } // the catalog of the sink. There are two kind of schema here. The full schema is all columns diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 79a1b1622704e..8c73374a4d7d2 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -147,6 +147,7 @@ enum EncodeType { ENCODE_TYPE_BYTES = 6; ENCODE_TYPE_TEMPLATE = 7; ENCODE_TYPE_NONE = 8; + ENCODE_TYPE_TEXT = 9; } enum RowFormatType { diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index f8de4cd6d2099..033cb47ba5a24 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -404,6 +404,7 @@ fn mock_from_legacy_type( format, encode: SinkEncode::Json, options: Default::default(), + key_encode: None, })) } else { SinkFormatDesc::from_legacy_type(connector, r#type) diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index c0c0da9bc2047..7eaa977e16044 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -119,6 +119,8 @@ pub struct SinkFormatDesc { pub format: SinkFormat, pub encode: SinkEncode, pub options: BTreeMap, + + pub key_encode: Option, } /// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`]. @@ -136,6 +138,7 @@ pub enum SinkEncode { Protobuf, Avro, Template, + Text, } impl SinkFormatDesc { @@ -166,6 +169,7 @@ impl SinkFormatDesc { format, encode, options: Default::default(), + key_encode: None, })) } @@ -177,12 +181,16 @@ impl SinkFormatDesc { SinkFormat::Upsert => F::Upsert, SinkFormat::Debezium => F::Debezium, }; - let encode = match self.encode { + let mapping_encode = |sink_encode: &SinkEncode| match sink_encode { SinkEncode::Json => E::Json, SinkEncode::Protobuf => E::Protobuf, SinkEncode::Avro => E::Avro, SinkEncode::Template => E::Template, + SinkEncode::Text => E::Text, }; + + let encode = mapping_encode(&self.encode); + let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into()); let options = self .options .iter() @@ -193,6 +201,7 @@ impl SinkFormatDesc { format: format.into(), encode: encode.into(), options, + key_encode, } } } @@ -224,19 +233,37 @@ impl TryFrom for SinkFormatDesc { E::Protobuf => SinkEncode::Protobuf, E::Template => SinkEncode::Template, E::Avro => SinkEncode::Avro, - e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None) => { + e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => { return Err(SinkError::Config(anyhow!( "sink encode unsupported: {}", e.as_str_name() ))) } }; + let key_encode = match &value.key_encode() { + E::Text => Some(SinkEncode::Text), + E::Unspecified => None, + encode @ (E::Avro + | E::Bytes + | E::Csv + | E::Json + | E::Protobuf + | E::Template + | E::Native + | E::None) => { + return Err(SinkError::Config(anyhow!( + "unsupported {} as sink key encode", + encode.as_str_name() + ))) + } + }; let options = value.options.into_iter().collect(); Ok(Self { format, encode, options, + key_encode, }) } } diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index dd590d0302ecf..a0515bb7bc165 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -24,6 +24,7 @@ mod avro; mod json; mod proto; pub mod template; +pub mod text; pub use avro::{AvroEncoder, AvroHeader}; pub use json::JsonEncoder; diff --git a/src/connector/src/sink/encoder/text.rs b/src/connector/src/sink/encoder/text.rs new file mode 100644 index 0000000000000..734ac8bd6a425 --- /dev/null +++ b/src/connector/src/sink/encoder/text.rs @@ -0,0 +1,108 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use risingwave_common::types::{DataType, ToText}; + +use super::RowEncoder; + +pub struct TextEncoder { + pub schema: Schema, + // the column must contain only one element + pub col_index: usize, +} + +impl TextEncoder { + pub fn new(schema: Schema, col_index: usize) -> Self { + Self { schema, col_index } + } +} + +impl RowEncoder for TextEncoder { + type Output = String; + + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn col_indices(&self) -> Option<&[usize]> { + Some(std::slice::from_ref(&self.col_index)) + } + + fn encode_cols( + &self, + row: impl risingwave_common::row::Row, + col_indices: impl Iterator, + ) -> crate::sink::Result { + // It is guaranteed by the caller that col_indices contains only one element + let mut result = String::new(); + for col_index in col_indices { + let datum = row.datum_at(col_index); + let data_type = &self.schema.fields[col_index].data_type; + if data_type == &DataType::Boolean { + result = if let Some(scalar_impl) = datum { + scalar_impl.into_bool().to_string() + } else { + "NULL".to_string() + } + } else { + result = datum.to_text_with_type(data_type); + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use risingwave_common::catalog::Field; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::ScalarImpl; + + use super::*; + + #[test] + fn test_text_encoder_ser_bool() { + let schema = Schema::new(vec![Field::with_name(DataType::Boolean, "col1")]); + let encoder = TextEncoder::new(schema, 0); + + let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(true))]); + assert_eq!( + encoder + .encode_cols(&row, std::iter::once(0)) + .unwrap() + .as_str(), + "true" + ); + + let row = OwnedRow::new(vec![Some(ScalarImpl::Bool(false))]); + assert_eq!( + encoder + .encode_cols(&row, std::iter::once(0)) + .unwrap() + .as_str(), + "false" + ); + + let row = OwnedRow::new(vec![None]); + assert_eq!( + encoder + .encode_cols(&row, std::iter::once(0)) + .unwrap() + .as_str(), + "NULL" + ); + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 8d6f1eb8ab463..9fd1a5cb6f2be 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -24,10 +24,12 @@ mod upsert; pub use append_only::AppendOnlyFormatter; pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter}; use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; +use super::encoder::text::TextEncoder; use super::encoder::{ DateHandlingMode, KafkaConnectParams, TimeHandlingMode, TimestamptzHandlingMode, }; @@ -68,13 +70,22 @@ macro_rules! tri { } pub enum SinkFormatterImpl { + // append-only AppendOnlyJson(AppendOnlyFormatter), + AppendOnlyTextJson(AppendOnlyFormatter), AppendOnlyProto(AppendOnlyFormatter), + AppendOnlyTextProto(AppendOnlyFormatter), + AppendOnlyTemplate(AppendOnlyFormatter), + AppendOnlyTextTemplate(AppendOnlyFormatter), + // upsert UpsertJson(UpsertFormatter), + UpsertTextJson(UpsertFormatter), UpsertAvro(UpsertFormatter), - DebeziumJson(DebeziumJsonFormatter), - AppendOnlyTemplate(AppendOnlyFormatter), + UpsertTextAvro(UpsertFormatter), UpsertTemplate(UpsertFormatter), + UpsertTextTemplate(UpsertFormatter), + // debezium + DebeziumJson(DebeziumJsonFormatter), } #[derive(Debug, Clone)] @@ -147,6 +158,51 @@ impl EncoderBuild for ProtoEncoder { } } +impl EncoderBuild for TextEncoder { + async fn build(params: EncoderParams<'_>, pk_indices: Option>) -> Result { + let Some(pk_indices) = pk_indices else { + return Err(SinkError::Config(anyhow!( + "TextEncoder requires primary key columns to be specified" + ))); + }; + if pk_indices.len() != 1 { + return Err(SinkError::Config(anyhow!( + "The key encode is TEXT, but the primary key has {} columns. The key encode TEXT requires the primary key to be a single column", + pk_indices.len() + ))); + } + + let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| { + SinkError::Config(anyhow!( + "The primary key column index {} is out of bounds in schema {:?}", + pk_indices[0], + params.schema + )) + })?; + match &schema_ref.data_type() { + DataType::Varchar + | DataType::Boolean + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int256 + | DataType::Serial => {} + _ => { + // why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/pull/16377#discussion_r1591864960 + return Err(SinkError::Config( + anyhow!( + "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.", + schema_ref.name, + schema_ref.data_type + ), + )); + } + } + + Ok(Self::new(params.schema, pk_indices[0])) + } +} + impl EncoderBuild for AvroEncoder { async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let loader = @@ -269,24 +325,53 @@ impl SinkFormatterImpl { T::build(p).await } - Ok(match (&format_desc.format, &format_desc.encode) { - (F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?), - (F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?), - (F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?), - (F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?), - (F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?), - (F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?), - (F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?), - (F::AppendOnly, E::Avro) - | (F::Upsert, E::Protobuf) - | (F::Debezium, E::Avro | E::Protobuf | E::Template) => { - return Err(SinkError::Config(anyhow!( - "sink format/encode unsupported: {:?} {:?}", - format_desc.format, - format_desc.encode, - ))); - } - }) + Ok( + match ( + &format_desc.format, + &format_desc.encode, + &format_desc.key_encode, + ) { + (F::AppendOnly, E::Json, Some(E::Text)) => { + Impl::AppendOnlyTextJson(build(p).await?) + } + (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?), + (F::AppendOnly, E::Protobuf, Some(E::Text)) => { + Impl::AppendOnlyTextProto(build(p).await?) + } + (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?), + (F::AppendOnly, E::Template, Some(E::Text)) => { + Impl::AppendOnlyTextTemplate(build(p).await?) + } + (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?), + (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?), + (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?), + (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?), + (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?), + (F::Upsert, E::Template, Some(E::Text)) => { + Impl::UpsertTextTemplate(build(p).await?) + } + (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?), + (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?), + (F::AppendOnly | F::Upsert, E::Text, _) => { + return Err(SinkError::Config(anyhow!( + "ENCODE TEXT is only valid as key encode." + ))); + } + (F::AppendOnly, E::Avro, _) + | (F::Upsert, E::Protobuf, _) + | (F::Debezium, E::Json, Some(_)) + | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _) + | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode + => { + return Err(SinkError::Config(anyhow!( + "sink format/encode/key_encode unsupported: {:?} {:?} {:?}", + format_desc.format, + format_desc.encode, + format_desc.key_encode + ))); + } + }, + ) } } @@ -295,11 +380,18 @@ macro_rules! dispatch_sink_formatter_impl { ($impl:expr, $name:ident, $body:expr) => { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyTextJson($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, + SinkFormatterImpl::AppendOnlyTextProto($name) => $body, + SinkFormatterImpl::UpsertJson($name) => $body, + SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro($name) => $body, + SinkFormatterImpl::UpsertTextAvro($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, + SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, + SinkFormatterImpl::UpsertTextTemplate($name) => $body, SinkFormatterImpl::UpsertTemplate($name) => $body, } }; @@ -310,11 +402,18 @@ macro_rules! dispatch_sink_formatter_str_key_impl { ($impl:expr, $name:ident, $body:expr) => { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyTextJson($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, + SinkFormatterImpl::AppendOnlyTextProto($name) => $body, + SinkFormatterImpl::UpsertJson($name) => $body, + SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro(_) => unreachable!(), + SinkFormatterImpl::UpsertTextAvro($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, + SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, + SinkFormatterImpl::UpsertTextTemplate($name) => $body, SinkFormatterImpl::UpsertTemplate($name) => $body, } }; diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 10ecff85f2ccc..9da1d8b4d3f03 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -48,6 +48,7 @@ pub struct RedisCommon { #[serde(rename = "redis.url")] pub url: String, } + pub enum RedisPipe { Cluster(ClusterPipeline), Single(Pipeline), @@ -140,6 +141,7 @@ impl RedisCommon { } } } + #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct RedisConfig { @@ -260,6 +262,7 @@ struct RedisSinkPayloadWriter { // the command pipeline for write-commit pipe: RedisPipe, } + impl RedisSinkPayloadWriter { pub async fn new(config: RedisConfig) -> Result { let (conn, pipe) = config.common.build_conn_and_pipe().await?; @@ -405,6 +408,7 @@ mod test { format: SinkFormat::AppendOnly, encode: SinkEncode::Json, options: BTreeMap::default(), + key_encode: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) @@ -425,11 +429,19 @@ mod test { .write_chunk(chunk_a, manager.start_write_chunk(0, 0)) .await .expect("failed to write batch"); - let expected_a = - vec![ - (0, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n"), - (1, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n"), - (2, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n"), + let expected_a = vec![ + ( + 0, + "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n", + ), + ( + 1, + "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n", + ), + ( + 2, + "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n", + ), ]; if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe { @@ -473,6 +485,7 @@ mod test { format: SinkFormat::AppendOnly, encode: SinkEncode::Template, options: btree_map, + key_encode: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index b15e9ba3e9867..9f3c089998fbc 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -64,6 +64,7 @@ fn encode_type_to_encode(from: EncodeType) -> Option { EncodeType::Bytes => Encode::Bytes, EncodeType::Template => Encode::Template, EncodeType::None => Encode::None, + EncodeType::Text => Encode::Text, }) } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2703850dba0bc..f2d8cfb733bf8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -718,10 +718,23 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { E::Protobuf => SinkEncode::Protobuf, E::Avro => SinkEncode::Avro, E::Template => SinkEncode::Template, - e @ (E::Native | E::Csv | E::Bytes | E::None) => { + e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()); } }; + + let mut key_encode = None; + if let Some(encode) = value.key_encode { + if encode == E::Text { + key_encode = Some(SinkEncode::Text); + } else { + return Err(ErrorCode::BindError(format!( + "sink key encode unsupported: {encode}, only TEXT supported" + )) + .into()); + } + } + let mut options = WithOptions::try_from(value.row_options.as_slice())?.into_inner(); options @@ -732,6 +745,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { format, encode, options, + key_encode, }) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 69ac33a827946..9bcdd3d48f0bc 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1498,6 +1498,7 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { Encode::Bytes => EncodeType::Bytes, Encode::Template => EncodeType::Template, Encode::None => EncodeType::None, + Encode::Text => EncodeType::Text, } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index e0bd1eb225e07..0e3c1683009d3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -148,18 +148,19 @@ impl IcebergPartitionInfo { )) } } + #[inline] fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { columns - .iter() - .position(|col| col.column_desc.name == col_name) - .ok_or_else(|| { - ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) - ))) + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name), + ))) .into() - }) + }) } /// [`StreamSink`] represents a table/connector sink at the very end of the graph. @@ -315,6 +316,7 @@ impl StreamSink { let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); let mut downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut extra_partition_col_idx = None; let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), @@ -465,16 +467,16 @@ impl StreamSink { (false, true, false) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if syntax_legacy {"WITH"} else {"FORMAT ENCODE"}), + format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }), ))) - .into()) + .into()) } (_, false, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy {"type='append-only'"} else {"FORMAT PLAIN"}), + format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy { "type='append-only'" } else { "FORMAT PLAIN" }), ))) - .into()) + .into()) } } } @@ -612,6 +614,7 @@ mod test { }, ] } + #[test] fn test_iceberg_convert_to_expression() { let partition_type = StructType::new(vec![ diff --git a/src/sqlparser/src/ast/legacy_source.rs b/src/sqlparser/src/ast/legacy_source.rs index ad005c49d0a13..e5f957231f5ad 100644 --- a/src/sqlparser/src/ast/legacy_source.rs +++ b/src/sqlparser/src/ast/legacy_source.rs @@ -66,6 +66,11 @@ impl From for CompatibleSourceSchema { pub fn parse_source_schema(p: &mut Parser) -> Result { if let Some(schema_v2) = p.parse_schema()? { + if schema_v2.key_encode.is_some() { + return Err(ParserError::ParserError( + "key encode clause is not supported in source schema".to_string(), + )); + } Ok(CompatibleSourceSchema::V2(schema_v2)) } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW]) && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT]) @@ -108,7 +113,7 @@ pub fn parse_source_schema(p: &mut Parser) -> Result Result Result { impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p); diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index a821cd77e70e3..3bc99fe6e9482 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -94,13 +94,20 @@ pub struct CreateSourceStatement { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { Native, - None, // Keyword::NONE - Debezium, // Keyword::DEBEZIUM - DebeziumMongo, // Keyword::DEBEZIUM_MONGO - Maxwell, // Keyword::MAXWELL - Canal, // Keyword::CANAL - Upsert, // Keyword::UPSERT - Plain, // Keyword::PLAIN + // Keyword::NONE + None, + // Keyword::DEBEZIUM + Debezium, + // Keyword::DEBEZIUM_MONGO + DebeziumMongo, + // Keyword::MAXWELL + Maxwell, + // Keyword::CANAL + Canal, + // Keyword::UPSERT + Upsert, + // Keyword::PLAIN + Plain, } // TODO: unify with `from_keyword` @@ -133,12 +140,12 @@ impl Format { "PLAIN" => Format::Plain, "UPSERT" => Format::Upsert, "NATIVE" => Format::Native, // used internally for schema change - "NONE" => Format::None, // used by iceberg + "NONE" => Format::None, // used by iceberg _ => { return Err(ParserError::ParserError( "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT" .to_string(), - )) + )); } }) } @@ -153,6 +160,7 @@ pub enum Encode { Json, // Keyword::JSON Bytes, // Keyword::BYTES None, // Keyword::None + Text, // Keyword::TEXT Native, Template, } @@ -172,6 +180,7 @@ impl fmt::Display for Encode { Encode::Native => "NATIVE", Encode::Template => "TEMPLATE", Encode::None => "NONE", + Encode::Text => "TEXT", } ) } @@ -181,6 +190,7 @@ impl Encode { pub fn from_keyword(s: &str) -> Result { Ok(match s { "AVRO" => Encode::Avro, + "TEXT" => Encode::Text, "BYTES" => Encode::Bytes, "CSV" => Encode::Csv, "PROTOBUF" => Encode::Protobuf, @@ -202,6 +212,8 @@ pub struct ConnectorSchema { pub format: Format, pub row_encode: Encode, pub row_options: Vec, + + pub key_encode: Option, } impl Parser { @@ -291,10 +303,19 @@ impl Parser { let row_encode = Encode::from_keyword(&s)?; let row_options = self.parse_options()?; + let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) { + Some(Encode::from_keyword( + self.parse_identifier()?.value.to_ascii_uppercase().as_str(), + )?) + } else { + None + }; + Ok(Some(ConnectorSchema { format, row_encode, row_options, + key_encode, })) } } @@ -305,6 +326,7 @@ impl ConnectorSchema { format: Format::Plain, row_encode: Encode::Json, row_options: Vec::new(), + key_encode: None, } } @@ -314,6 +336,7 @@ impl ConnectorSchema { format: Format::Debezium, row_encode: Encode::Json, row_options: Vec::new(), + key_encode: None, } } @@ -322,6 +345,7 @@ impl ConnectorSchema { format: Format::DebeziumMongo, row_encode: Encode::Json, row_options: Vec::new(), + key_encode: None, } } @@ -331,6 +355,7 @@ impl ConnectorSchema { format: Format::Native, row_encode: Encode::Native, row_options: Vec::new(), + key_encode: None, } } @@ -341,6 +366,7 @@ impl ConnectorSchema { format: Format::None, row_encode: Encode::None, row_options: Vec::new(), + key_encode: None, } } @@ -623,6 +649,7 @@ impl ParseTo for CreateSubscriptionStatement { }) } } + impl fmt::Display for CreateSubscriptionStatement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut v: Vec = vec![]; @@ -654,6 +681,7 @@ impl fmt::Display for DeclareCursor { v.iter().join(" ").fmt(f) } } + // sql_grammar!(DeclareCursorStatement { // cursor_name: Ident, // [Keyword::SUBSCRIPTION] @@ -692,6 +720,7 @@ impl ParseTo for DeclareCursorStatement { }) } } + impl fmt::Display for DeclareCursorStatement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut v: Vec = vec![]; @@ -763,6 +792,7 @@ impl ParseTo for CloseCursorStatement { Ok(Self { cursor_name }) } } + impl fmt::Display for CloseCursorStatement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut v: Vec = vec![]; diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index ea28a79c01c6e..05217fd53a723 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -483,7 +483,7 @@ impl Parser { ObjectName(id_parts), self.parse_except()?, )) - } + }; } unexpected => { return self.expected( @@ -1928,7 +1928,7 @@ impl Parser { _ => { return token .cloned() - .unwrap_or(TokenWithLocation::wrap(Token::EOF)) + .unwrap_or(TokenWithLocation::wrap(Token::EOF)); } } } @@ -3575,6 +3575,11 @@ impl Parser { } } else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) { let connector_schema = self.parse_schema()?.unwrap(); + if connector_schema.key_encode.is_some() { + return Err(ParserError::ParserError( + "key encode clause is not supported in source schema".to_string(), + )); + } AlterSourceOperation::FormatEncode { connector_schema } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterSourceOperation::RefreshSchema diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 1fa2833267f81..a718a8517bed1 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -35,13 +35,13 @@ Near "pad CHARACTER VARYING) FROM sbtest" - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' - input: CREATE TABLE T (v1 INT, v2 STRUCT) formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT) - input: CREATE TABLE T (v1 INT, v2 STRUCT>)