Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reject duplicate entries in with clause #13488

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# statement error
# create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
Expand Down
3 changes: 1 addition & 2 deletions e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ create table sr_pb_test with (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
message = 'test.User')
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
Expand Down
37 changes: 22 additions & 15 deletions src/frontend/src/utils/with_options.rs
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::num::NonZeroU32;

use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result as RwResult, RwError};
use risingwave_connector::source::kafka::{
insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY,
Expand Down Expand Up @@ -172,20 +171,28 @@ impl TryFrom<&[SqlOption]> for WithOptions {
type Error = RwError;

fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
let inner = options
.iter()
.cloned()
.map(|x| match x.value {
Value::CstyleEscapedString(s) => Ok((x.name.real_value(), s.value)),
Value::SingleQuotedString(s) => Ok((x.name.real_value(), s)),
Value::Number(n) => Ok((x.name.real_value(), n)),
Value::Boolean(b) => Ok((x.name.real_value(), b.to_string())),
_ => Err(ErrorCode::InvalidParameterValue(
"`with options` or `with properties` only support single quoted string value and C style escaped string"
.to_owned(),
)),
})
.try_collect()?;
let mut inner: BTreeMap<String, String> = BTreeMap::new();
for option in options {
let key = option.name.real_value();
let value: String = match option.value.clone() {
Value::CstyleEscapedString(s) => s.value,
Value::SingleQuotedString(s) => s,
Value::Number(n) => n,
Value::Boolean(b) => b.to_string(),
_ => {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"`with options` or `with properties` only support single quoted string value and C style escaped string"
.to_owned(),
)))
}
};
if inner.insert(key.clone(), value).is_some() {
return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
"Duplicated option: {}",
key
))));
}
}

Ok(Self { inner })
}
Expand Down
Loading