From 2cb4bd844ca853df24cf1e51f8a67bbaff8b6bd9 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 17 Nov 2023 14:24:57 +0800 Subject: [PATCH 1/4] fix --- e2e_test/ddl/throttle.slt | 4 ++-- src/frontend/src/utils/with_options.rs | 29 +++++++++++++++++--------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/e2e_test/ddl/throttle.slt b/e2e_test/ddl/throttle.slt index 6395697f0df79..9b6c2f053bf63 100644 --- a/e2e_test/ddl/throttle.slt +++ b/e2e_test/ddl/throttle.slt @@ -6,8 +6,8 @@ create table t1 (v1 int); # tracked in https://github.com/risingwavelabs/risingwave/issues/13474 # create with duplicate streaming_rate_limit -# statement error -# create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; +statement error Duplicated option +create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; # create with unknown fields statement error unexpected options in WITH clause diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 4b0a70ef856dc..a3b04d0b4a2d7 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; use std::num::NonZeroU32; @@ -172,18 +172,27 @@ impl TryFrom<&[SqlOption]> for WithOptions { type Error = RwError; fn try_from(options: &[SqlOption]) -> Result { + let mut option_entry = HashSet::new(); let inner = options .iter() .cloned() - .map(|x| match x.value { - Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)), - Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), - Value::Number(n) => Ok((x.name.real_value(), n)), - Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), - _ => Err(ErrorCode::InvalidParameterValue( - "`with options` or `with properties` only support single quoted string value and C style escaped string" - .to_owned(), - )), + .map(|x| { + if !option_entry.insert(x.name.real_value()) { + return Err(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + x.name.real_value() + ))); + } + match x.value { + Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)), + Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), + Value::Number(n) => Ok((x.name.real_value(), n)), + Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), + _ => Err(ErrorCode::InvalidParameterValue( + "`with options` or `with properties` only support single quoted string value and C style escaped string" + .to_owned(), + )), + } }) .try_collect()?; From 005a8d469da0d282f75e71e79394af3b4463ac74 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 17 Nov 2023 15:28:26 +0800 Subject: [PATCH 2/4] fix test --- e2e_test/schema_registry/pb.slt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt index 618ecd71e960c..e2029dee1203e 100644 --- a/e2e_test/schema_registry/pb.slt +++ b/e2e_test/schema_registry/pb.slt @@ -7,8 +7,7 @@ create table sr_pb_test with ( connector = 'kafka', topic = 'sr_pb_test', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - message = 'test.User') + scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( schema.registry = 'http://message_queue:8081', message = 'test.User' From 22f993b9ec424d8e8291dc4b86c24e828f2faa8b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 17 Nov 2023 15:44:49 +0800 Subject: [PATCH 3/4] fix --- src/frontend/src/utils/with_options.rs | 42 ++++++++++++-------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index a3b04d0b4a2d7..77d4185804215 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::num::NonZeroU32; -use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; use risingwave_connector::source::kafka::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, @@ -172,29 +171,28 @@ impl TryFrom<&[SqlOption]> for WithOptions { type Error = RwError; fn try_from(options: &[SqlOption]) -> Result { - let mut option_entry = HashSet::new(); - let inner = options - .iter() - .cloned() - .map(|x| { - if !option_entry.insert(x.name.real_value()) { - return Err(ErrorCode::InvalidParameterValue(format!( - "Duplicated option: {}", - x.name.real_value() - ))); - } - match x.value { - Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)), - Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), - Value::Number(n) => Ok((x.name.real_value(), n)), - Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), - _ => Err(ErrorCode::InvalidParameterValue( + let mut inner: BTreeMap = BTreeMap::new(); + for option in options { + let key = option.name.real_value(); + let value: String = match option.value.clone() { + Value::CstyleEscapedString(s) => s.value, + Value::SingleQuotedString(s) => s, + Value::Number(n) => n, + Value::Boolean(b) => b.to_string(), + _ => { + return Err(RwError::from(ErrorCode::InvalidParameterValue( "`with options` or `with properties` only support single quoted string value and C style escaped string" .to_owned(), - )), + ))) } - }) - .try_collect()?; + }; + if inner.insert(key.clone(), value).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + } Ok(Self { inner }) } From 81f30b2e1545bfc3c53e4a9cdbb9e1caca1b5c7c Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 17 Nov 2023 22:41:19 +0800 Subject: [PATCH 4/4] fix test --- e2e_test/schema_registry/pb.slt | 3 +-- e2e_test/source/basic/kafka.slt | 16 ++++++++++------ .../source/basic/old_row_format_syntax/kafka.slt | 6 ++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt index e2029dee1203e..fb40759d34ada 100644 --- a/e2e_test/schema_registry/pb.slt +++ b/e2e_test/schema_registry/pb.slt @@ -19,8 +19,7 @@ create table sr_pb_test_bk with ( connector = 'kafka', topic = 'sr_pb_test', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - message = 'test.User') + scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( schema.registry = 'http://message_queue:8081,http://message_queue:8081', message = 'test.User' diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 56097de50809a..b1ad66bda7f99 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -184,9 +184,11 @@ create table s11 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' -) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema'); + scan.startup.mode = 'earliest') +FORMAT PLAIN ENCODE PROTOBUF ( + message = 'test.User', + schema.location = 'file:///risingwave/proto-complex-schema' +); statement ok CREATE TABLE s12( @@ -268,9 +270,11 @@ create source s17 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' -) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema'); + scan.startup.mode = 'earliest') +FORMAT PLAIN ENCODE PROTOBUF ( + message = 'test.User', + schema.location = 'file:///risingwave/proto-complex-schema' +); statement ok create source s18 with ( diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index 3d2e4719d744d..05e0d55c28c48 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -176,8 +176,7 @@ create table s11 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' + scan.startup.mode = 'earliest' ) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema' statement ok @@ -260,8 +259,7 @@ create source s17 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' + scan.startup.mode = 'earliest' ) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema' statement ok