From fffff6dd54f114b72d858b453aafe5367d389bbe Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 28 Dec 2023 17:58:18 +0800 Subject: [PATCH] feat: Revert "feat(sink): support format encode syntax for sink into table (#14134) (#14258) --- e2e_test/sink/sink_into_table.slt | 4 +-- src/connector/src/sink/catalog/mod.rs | 5 +--- src/connector/src/sink/formatter/mod.rs | 2 -- src/frontend/src/handler/create_sink.rs | 35 ++++++------------------- src/sqlparser/src/keywords.rs | 1 - 5 files changed, 11 insertions(+), 36 deletions(-) diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt index d5ce724b8f5a7..a3df061fc8db4 100644 --- a/e2e_test/sink/sink_into_table.slt +++ b/e2e_test/sink/sink_into_table.slt @@ -55,7 +55,7 @@ statement error Only append-only sinks can sink to a table without primary keys. create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1; statement ok -create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1 FORMAT PLAIN ENCODE NATIVE(force_append_only='true'); +create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1 with (type = 'append-only', force_append_only = 'true'); statement ok flush; @@ -154,7 +154,7 @@ statement error Only append-only sinks can sink to a table without primary keys. create sink s2 into t_append_only as select v1, v2 from t_s2; statement ok -create sink s2 into t_append_only as select v1, v2 from t_s2 FORMAT PLAIN ENCODE NATIVE(force_append_only='true'); +create sink s2 into t_append_only as select v1, v2 from t_s2 with (type = 'append-only', force_append_only = 'true'); statement ok flush; diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 8ced168ce919d..55d450adc5ee2 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -133,7 +133,6 @@ pub enum SinkEncode { Protobuf, Avro, Template, - Native, } impl SinkFormatDesc { @@ -180,7 +179,6 @@ impl SinkFormatDesc { SinkEncode::Protobuf => E::Protobuf, SinkEncode::Avro => E::Avro, SinkEncode::Template => E::Template, - SinkEncode::Native => E::Native, }; let options = self .options @@ -218,8 +216,7 @@ impl TryFrom for SinkFormatDesc { E::Protobuf => SinkEncode::Protobuf, E::Template => SinkEncode::Template, E::Avro => SinkEncode::Avro, - E::Native => SinkEncode::Native, - e @ (E::Unspecified | E::Csv | E::Bytes) => { + e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => { return Err(SinkError::Config(anyhow!( "sink encode unsupported: {}", e.as_str_name() diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 1abf451fb061e..4864b4027359b 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -150,7 +150,6 @@ impl SinkFormatterImpl { AppendOnlyFormatter::new(Some(key_encoder), val_encoder), )) } - SinkEncode::Native => err_unsupported(), } } SinkFormat::Debezium => { @@ -252,7 +251,6 @@ impl SinkFormatterImpl { Ok(SinkFormatterImpl::UpsertAvro(formatter)) } SinkEncode::Protobuf => err_unsupported(), - SinkEncode::Native => err_unsupported(), } } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ec8fa2fc2a33c..4a8cc866a18f3 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -173,23 +173,13 @@ pub fn gen_sink_plan( } None => match with_options.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` - Some(t) => { - if !allow_old_sink_type_syntax(&connector) { - return Err(ErrorCode::BindError(format!( - "connector {} does not support `type = '...'`, please use syntax `FORMAT ... ENCODE ...` instead.", - connector - )) - .into()); - } else { - SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| { - session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`."); - if let Some(v) = with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) { - f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into()); - } - f - }) + Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| { + session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`."); + if let Some(v) = with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) { + f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into()); } - } + f + }), // Case C: no format + encode required None => None, }, @@ -255,7 +245,7 @@ pub fn gen_sink_plan( || sink_catalog.sink_type == SinkType::ForceAppendOnly) { return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys. Please try to add \"FORMAT PLAIN ENCODE NATIVE\"".to_string(), + "Only append-only sinks can sink to a table without primary keys.".to_string(), ))); } @@ -591,8 +581,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { E::Protobuf => SinkEncode::Protobuf, E::Avro => SinkEncode::Avro, E::Template => SinkEncode::Template, - E::Native => SinkEncode::Native, - e @ (E::Csv | E::Bytes) => { + e @ (E::Native | E::Csv | E::Bytes) => { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()); } }; @@ -637,17 +626,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json,Encode::Template], Format::Upsert => vec![Encode::Json,Encode::Template], ), - "table" => hashmap!( - Format::Plain => vec![Encode::Native], - Format::Upsert => vec![Encode::Native], - ), )) }); -pub fn allow_old_sink_type_syntax(connector: &str) -> bool { - !matches!(connector, "table") -} - pub fn validate_compatibility(connector: &str, format_desc: &ConnectorSchema) -> Result<()> { let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS .get(connector) diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 0544a4fcc27f6..55b7f81949719 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -601,7 +601,6 @@ pub const RESERVED_FOR_TABLE_ALIAS: &[Keyword] = &[ Keyword::SET, Keyword::RETURNING, Keyword::EMIT, - Keyword::FORMAT, ]; /// Can't be used as a column alias, so that `SELECT alias`