Skip to content

Commit

Permalink
fix: reject duplicate entries in with clause (#13488)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Nov 17, 2023
1 parent a34f46a commit b289d38
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 31 deletions.
4 changes: 2 additions & 2 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# statement error
# create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
Expand Down
6 changes: 2 additions & 4 deletions e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ create table sr_pb_test with (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
message = 'test.User')
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
Expand All @@ -20,8 +19,7 @@ create table sr_pb_test_bk with (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
message = 'test.User')
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081,http://message_queue:8081',
message = 'test.User'
Expand Down
16 changes: 10 additions & 6 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
proto.message = 'test.User'
) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema');
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
CREATE TABLE s12(
Expand Down Expand Up @@ -268,9 +270,11 @@ create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
proto.message = 'test.User'
) FORMAT PLAIN ENCODE PROTOBUF (message = 'test.User', schema.location = 'file:///risingwave/proto-complex-schema');
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
create source s18 with (
Expand Down
6 changes: 2 additions & 4 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
proto.message = 'test.User'
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement ok
Expand Down Expand Up @@ -260,8 +259,7 @@ create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
proto.message = 'test.User'
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement ok
Expand Down
37 changes: 22 additions & 15 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::num::NonZeroU32;

use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result as RwResult, RwError};
use risingwave_connector::source::kafka::{
insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY,
Expand Down Expand Up @@ -172,20 +171,28 @@ impl TryFrom<&[SqlOption]> for WithOptions {
type Error = RwError;

fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
let inner = options
.iter()
.cloned()
.map(|x| match x.value {
Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)),
Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)),
Value::Number(n) => Ok((x.name.real_value(), n)),
Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())),
_ => Err(ErrorCode::InvalidParameterValue(
"`with options` or `with properties` only support single quoted string value and C style escaped string"
.to_owned(),
)),
})
.try_collect()?;
let mut inner: BTreeMap<String, String> = BTreeMap::new();
for option in options {
let key = option.name.real_value();
let value: String = match option.value.clone() {
Value::CstyleEscapedString(s) => s.value,
Value::SingleQuotedString(s) => s,
Value::Number(n) => n,
Value::Boolean(b) => b.to_string(),
_ => {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"`with options` or `with properties` only support single quoted string value and C style escaped string"
.to_owned(),
)))
}
};
if inner.insert(key.clone(), value).is_some() {
return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
"Duplicated option: {}",
key
))));
}
}

Ok(Self { inner })
}
Expand Down

0 comments on commit b289d38

Please sign in to comment.