From b289d384c74afa9fac3c415960b5e013b619b666 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Sat, 18 Nov 2023 00:46:36 +0800 Subject: [PATCH] fix: reject duplicate entries in with clause (#13488) --- e2e_test/ddl/throttle.slt | 4 +- e2e_test/schema_registry/pb.slt | 6 +-- e2e_test/source/basic/kafka.slt | 16 +++++--- .../basic/old_row_format_syntax/kafka.slt | 6 +-- src/frontend/src/utils/with_options.rs | 37 +++++++++++-------- 5 files changed, 38 insertions(+), 31 deletions(-) diff --git a/e2e_test/ddl/throttle.slt b/e2e_test/ddl/throttle.slt index 6395697f0df7..9b6c2f053bf6 100644 --- a/e2e_test/ddl/throttle.slt +++ b/e2e_test/ddl/throttle.slt @@ -6,8 +6,8 @@ create table t1 (v1 int); # tracked in https://github.com/risingwavelabs/risingwave/issues/13474 # create with duplicate streaming_rate_limit -# statement error -# create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; +statement error Duplicated option +create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; # create with unknown fields statement error unexpected options in WITH clause diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt index 618ecd71e960..fb40759d34ad 100644 --- a/e2e_test/schema_registry/pb.slt +++ b/e2e_test/schema_registry/pb.slt @@ -7,8 +7,7 @@ create table sr_pb_test with ( connector = 'kafka', topic = 'sr_pb_test', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - message = 'test.User') + scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( schema.registry = 'http://message_queue:8081', message = 'test.User' @@ -20,8 +19,7 @@ create table sr_pb_test_bk with ( connector = 'kafka', topic = 'sr_pb_test', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - message = 'test.User') + scan.startup.mode = 'earliest') FORMAT plain ENCODE protobuf( schema.registry = 'http://message_queue:8081,http://message_queue:8081', message = 'test.User' diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 56097de50809..b1ad66bda7f9 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -184,9 +184,11 @@ create table s11 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' -) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema'); + scan.startup.mode = 'earliest') +FORMAT PLAIN ENCODE PROTOBUF ( + message = 'test.User', + schema.location = 'file:///risingwave/proto-complex-schema' +); statement ok CREATE TABLE s12( @@ -268,9 +270,11 @@ create source s17 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' -) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema'); + scan.startup.mode = 'earliest') +FORMAT PLAIN ENCODE PROTOBUF ( + message = 'test.User', + schema.location = 'file:///risingwave/proto-complex-schema' +); statement ok create source s18 with ( diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index 3d2e4719d744..05e0d55c28c4 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -176,8 +176,7 @@ create table s11 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' + scan.startup.mode = 'earliest' ) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema' statement ok @@ -260,8 +259,7 @@ create source s17 with ( connector = 'kafka', topic = 'proto_c_bin', properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest', - proto.message = 'test.User' + scan.startup.mode = 'earliest' ) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema' statement ok diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 4b0a70ef856d..77d418580421 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::num::NonZeroU32; -use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; use risingwave_connector::source::kafka::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, @@ -172,20 +171,28 @@ impl TryFrom<&[SqlOption]> for WithOptions { type Error = RwError; fn try_from(options: &[SqlOption]) -> Result { - let inner = options - .iter() - .cloned() - .map(|x| match x.value { - Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)), - Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)), - Value::Number(n) => Ok((x.name.real_value(), n)), - Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())), - _ => Err(ErrorCode::InvalidParameterValue( - "`with options` or `with properties` only support single quoted string value and C style escaped string" - .to_owned(), - )), - }) - .try_collect()?; + let mut inner: BTreeMap = BTreeMap::new(); + for option in options { + let key = option.name.real_value(); + let value: String = match option.value.clone() { + Value::CstyleEscapedString(s) => s.value, + Value::SingleQuotedString(s) => s, + Value::Number(n) => n, + Value::Boolean(b) => b.to_string(), + _ => { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "`with options` or `with properties` only support single quoted string value and C style escaped string" + .to_owned(), + ))) + } + }; + if inner.insert(key.clone(), value).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + } Ok(Self { inner }) }