diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..2049545696216 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_common::bail; @@ -49,7 +49,7 @@ pub struct DebeziumProps { } impl DebeziumProps { - pub fn from(props: &BTreeMap) -> Self { + pub fn from(props: &HashMap) -> Self { let ignore_key = props .get(DEBEZIUM_IGNORE_KEY) .map(|v| v.eq_ignore_ascii_case("true")) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4e0549ef99a98..a76917ad5973f 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1054,7 +1054,7 @@ impl MapHandling { pub const OPTION_KEY: &'static str = "map.handling.mode"; pub fn from_options( - options: &std::collections::BTreeMap, + options: &std::collections::HashMap, ) -> Result, InvalidOptionError> { let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some("jsonb") => Self::Jsonb, diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index a40b1153d5215..5576bbc2e66e1 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -70,7 +70,7 @@ impl TimestamptzHandling { pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; pub fn from_options( - options: &std::collections::BTreeMap, + options: &std::collections::HashMap, ) -> Result, InvalidOptionError> { let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some("utc_string") => Self::UtcString, diff --git a/src/connector/src/schema/loader.rs b/src/connector/src/schema/loader.rs index a50d8cced575b..3d4d9325e5918 100644 --- a/src/connector/src/schema/loader.rs +++ b/src/connector/src/schema/loader.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; @@ -38,7 +38,7 @@ pub struct SchemaLoader { impl SchemaLoader { pub fn from_format_options( topic: &str, - format_options: &BTreeMap, + format_options: &HashMap, ) -> Result { let schema_location = format_options .get(SCHEMA_REGISTRY_KEY) diff --git a/src/connector/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs index d140af83c853f..b052de359e588 100644 --- a/src/connector/src/schema/protobuf.rs +++ b/src/connector/src/schema/protobuf.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use itertools::Itertools as _; use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor}; @@ -28,7 +28,7 @@ use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties /// `aws_auth_props` is only required when reading `s3://` URL. pub async fn fetch_descriptor( - format_options: &BTreeMap, + format_options: &HashMap, topic: &str, aws_auth_props: Option<&AwsAuthProps>, ) -> Result<(MessageDescriptor, Option), SchemaFetchError> { @@ -82,7 +82,7 @@ pub async fn fetch_descriptor( pub async fn fetch_from_registry( message_name: &str, - format_options: &BTreeMap, + format_options: &HashMap, topic: &str, ) -> Result<(MessageDescriptor, i32), SchemaFetchError> { let loader = SchemaLoader::from_format_options(topic, format_options)?; diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index e5415c268d569..f64e3fbdd10c0 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use itertools::Itertools; use risingwave_common::catalog::{ @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::PbSinkDesc; use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkDesc { /// Id of the sink. For debug now. pub id: SinkId, @@ -48,7 +48,7 @@ pub struct SinkDesc { pub distribution_key: Vec, /// The properties of the sink. - pub properties: BTreeMap, + pub properties: HashMap, // The append-only behavior of the physical sink connector. Frontend will determine `sink_type` // based on both its own derivation on the append-only attribute and other user-specified @@ -136,3 +136,9 @@ impl SinkDesc { } } } + +impl std::hash::Hash for SinkDesc { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index bf5dd89dd7894..0113e484d7781 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -14,7 +14,7 @@ pub mod desc; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use anyhow::anyhow; use itertools::Itertools; @@ -114,11 +114,11 @@ impl SinkType { /// May replace [`SinkType`]. /// /// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SinkFormatDesc { pub format: SinkFormat, pub encode: SinkEncode, - pub options: BTreeMap, + pub options: HashMap, pub key_encode: Option, } diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index a0515bb7bc165..579d90a5b5b26 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use risingwave_common::catalog::Schema; @@ -117,7 +117,7 @@ impl TimestamptzHandlingMode { pub const FRONTEND_DEFAULT: &'static str = "utc_string"; pub const OPTION_KEY: &'static str = "timestamptz.handling.mode"; - pub fn from_options(options: &BTreeMap) -> Result { + pub fn from_options(options: &HashMap) -> Result { match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString), Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix), diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 6baeccdd32ed8..4b3733be22d1f 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -375,7 +375,6 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { #[cfg(test)] mod test { use core::panic; - use std::collections::BTreeMap; use rdkafka::message::FromBytes; use risingwave_common::array::{Array, I32Array, Op, Utf8Array}; @@ -407,7 +406,7 @@ mod test { let format_desc = SinkFormatDesc { format: SinkFormat::AppendOnly, encode: SinkEncode::Json, - options: BTreeMap::default(), + options: HashMap::default(), key_encode: None, }; @@ -475,16 +474,16 @@ mod test { }, ]); - let mut btree_map = BTreeMap::default(); - btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); - btree_map.insert( + let mut hash_map = HashMap::default(); + hash_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string()); + hash_map.insert( VALUE_FORMAT.to_string(), "values:{id:{id},name:{name}}".to_string(), ); let format_desc = SinkFormatDesc { format: SinkFormat::AppendOnly, encode: SinkEncode::Template, - options: btree_map, + options: hash_map, key_encode: None, }; diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 348749ba3f113..1d395bbba4943 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -91,7 +91,7 @@ impl BrokerAddrRewriter { } #[inline(always)] -fn kafka_props_broker_key(with_properties: &BTreeMap) -> &str { +fn kafka_props_broker_key(with_properties: &HashMap) -> &str { if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) { KAFKA_PROPS_BROKER_KEY } else { @@ -101,7 +101,7 @@ fn kafka_props_broker_key(with_properties: &BTreeMap) -> &str { #[inline(always)] fn get_property_required( - with_properties: &BTreeMap, + with_properties: &HashMap, property: &str, ) -> ConnectorResult { with_properties @@ -112,7 +112,7 @@ fn get_property_required( } pub fn insert_privatelink_broker_rewrite_map( - with_options: &mut BTreeMap, + with_options: &mut HashMap, svc: Option<&PrivateLinkService>, privatelink_endpoint: Option, ) -> ConnectorResult<()> { diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 54e1210979fe8..ba060f17cb00f 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; @@ -67,7 +67,7 @@ impl OwnedByUserCatalog for ConnectionCatalog { pub(crate) fn resolve_private_link_connection( connection: &Arc, - properties: &mut BTreeMap, + properties: &mut HashMap, ) -> Result<()> { #[allow(irrefutable_let_patterns)] if let connection::Info::PrivateLinkService(svc) = &connection.info { diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index 17292b1324ed2..68f543f854cff 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use risingwave_common::catalog::{ColumnCatalog, SourceVersionId}; use risingwave_common::util::epoch::Epoch; @@ -26,7 +26,7 @@ use crate::user::UserId; /// This struct `SourceCatalog` is used in frontend. /// Compared with `PbSource`, it only maintains information used during optimization. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct SourceCatalog { pub id: SourceId, pub name: String, @@ -36,7 +36,7 @@ pub struct SourceCatalog { pub owner: UserId, pub info: StreamSourceInfo, pub row_id_index: Option, - pub with_properties: BTreeMap, + pub with_properties: HashMap, pub watermark_descs: Vec, pub associated_table_id: Option, pub definition: String, @@ -149,3 +149,9 @@ impl OwnedByUserCatalog for SourceCatalog { self.owner } } + +impl std::hash::Hash for SourceCatalog { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 6efb31614a922..36a5a71a0e9be 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -13,7 +13,6 @@ // limitations under the License. use core::str::FromStr; -use std::collections::BTreeMap; use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER}; use risingwave_common::types::Interval; @@ -24,6 +23,7 @@ use thiserror_ext::AsReport; use super::OwnedByUserCatalog; use crate::error::{ErrorCode, Result}; +use crate::WithOptions; #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(test, derive(Default))] @@ -82,7 +82,7 @@ impl SubscriptionId { } impl SubscriptionCatalog { - pub fn set_retention_seconds(&mut self, properties: BTreeMap) -> Result<()> { + pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> { let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 9f3c089998fbc..7e509324bf1ce 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -148,11 +148,8 @@ pub async fn refresh_sr_and_get_columns_diff( connector_schema: &ConnectorSchema, session: &Arc, ) -> Result<(StreamSourceInfo, Vec, Vec)> { - let mut with_properties = original_source - .with_properties - .clone() - .into_iter() - .collect(); + let mut with_properties = original_source.with_properties.clone(); + validate_compatibility(connector_schema, &mut with_properties)?; if with_properties.is_cdc_connector() { diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 22491f9cb0ee3..1a8af68ddb881 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -126,7 +126,7 @@ pub async fn handle_create_connection( let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; let with_properties = handler_args.with_options.clone().into_connector_props(); - let create_connection_payload = resolve_create_connection_payload(&with_properties)?; + let create_connection_payload = resolve_create_connection_payload(with_properties.inner())?; let catalog_writer = session.catalog_writer()?; catalog_writer diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b0c00a2c862db..ecd61c3aa49d0 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -792,6 +792,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { options .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned()) .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned()); + let options = options.into_iter().collect(); Ok(SinkFormatDesc { format, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 75e188c086947..9232b45ae228e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::rc::Rc; use std::sync::LazyLock; @@ -90,7 +90,7 @@ pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; async fn extract_json_table_schema( schema_config: &Option<(AstString, bool)>, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result>> { match schema_config { None => Ok(None), @@ -140,7 +140,7 @@ fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool async fn extract_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, is_debezium: bool, ) -> Result> { let parser_config = SpecificParserConfig::new(info, with_properties)?; @@ -186,7 +186,7 @@ async fn extract_debezium_avro_table_pk_columns( async fn extract_protobuf_table_schema( schema: &ProtobufSchema, with_properties: &HashMap, - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result> { let info = StreamSourceInfo { proto_message_name: schema.message_name.0.clone(), @@ -224,14 +224,14 @@ fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec { } fn try_consume_string_from_options( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, key: &str, ) -> Option { format_encode_options.remove(key).map(AstString) } fn consume_string_from_options( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, key: &str, ) -> Result { try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError( @@ -239,12 +239,12 @@ fn consume_string_from_options( ))) } -fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap) { +fn consume_aws_config_from_options(format_encode_options: &mut HashMap) { format_encode_options.retain(|key, _| !key.starts_with("aws.")) } pub fn get_json_schema_location( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result> { let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); @@ -259,7 +259,7 @@ pub fn get_json_schema_location( } fn get_schema_location( - format_encode_options: &mut BTreeMap, + format_encode_options: &mut HashMap, ) -> Result<(AstString, bool)> { let schema_location = try_consume_string_from_options(format_encode_options, "schema.location"); let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry"); @@ -300,13 +300,13 @@ pub(crate) async fn bind_columns_from_source( let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); let mut format_encode_options_to_consume = format_encode_options.clone(); - fn get_key_message_name(options: &mut BTreeMap) -> Option { + fn get_key_message_name(options: &mut HashMap) -> Option { consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) .map(|ele| Some(ele.0)) .unwrap_or(None) } fn get_sr_name_strategy_check( - options: &mut BTreeMap, + options: &mut HashMap, use_sr: bool, ) -> Result> { let name_strategy = get_name_strategy_or_default(try_consume_string_from_options( @@ -1292,7 +1292,7 @@ pub fn bind_connector_props( handler_args: &HandlerArgs, source_schema: &ConnectorSchema, is_create_source: bool, -) -> Result> { +) -> Result { let mut with_properties = handler_args.with_options.clone().into_connector_props(); validate_compatibility(source_schema, &mut with_properties)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); @@ -1319,7 +1319,7 @@ pub async fn bind_create_source( handler_args: HandlerArgs, full_name: ObjectName, source_schema: ConnectorSchema, - with_properties: HashMap, + with_properties: WithOptions, sql_columns_defs: &[ColumnDef], constraints: Vec, wildcard_idx: Option, @@ -1420,7 +1420,7 @@ pub async fn bind_create_source( check_source_schema(&with_properties, row_id_index, &columns).await?; // resolve privatelink connection for Kafka - let mut with_properties = WithOptions::new(with_properties); + let mut with_properties = with_properties; let connection_id = resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?; let _secret_ref = resolve_secret_in_with_options(&mut with_properties, session)?; @@ -1485,7 +1485,7 @@ pub async fn handle_create_source( let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? } else { - bind_columns_from_source(&session, &source_schema, &with_properties).await? + bind_columns_from_source(&session, &source_schema, with_properties.inner()).await? }; if is_shared { // Note: this field should be called is_shared. Check field doc for more details. diff --git a/src/frontend/src/handler/create_subscription.rs b/src/frontend/src/handler/create_subscription.rs index 7da1a9d1683ed..8d4ed82cc82ee 100644 --- a/src/frontend/src/handler/create_subscription.rs +++ b/src/frontend/src/handler/create_subscription.rs @@ -64,7 +64,7 @@ pub fn create_subscription_catalog( initialized_at_cluster_version: None, }; - subscription_catalog.set_retention_seconds(context.with_options().clone().into_inner())?; + subscription_catalog.set_retention_seconds(context.with_options())?; Ok(subscription_catalog) } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 38829a16be11e..99d6d9ed9ddd0 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -481,7 +481,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let with_properties = bind_connector_props(&handler_args, &source_schema, false)?; let (columns_from_resolve_source, source_info) = - bind_columns_from_source(session, &source_schema, &with_properties).await?; + bind_columns_from_source(session, &source_schema, with_properties.inner()).await?; let (source_catalog, database_id, schema_id) = bind_create_source( handler_args.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 689bf8cba7e63..9c76bfdece25d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -449,7 +449,7 @@ impl StreamSink { let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc { Some(f) => ( f.format == SinkFormat::AppendOnly, - Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, + Self::is_user_force_append_only(&WithOptions::new(f.options.clone()))?, false, ), None => ( diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 3ee50276e5d10..d5d380677154b 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::num::NonZeroU32; use risingwave_connector::source::kafka::private_link::{ @@ -21,7 +21,7 @@ use risingwave_connector::source::kafka::private_link::{ use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{ CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, - CreateSubscriptionStatement, SqlOption, Statement, Value, + CreateSubscriptionStatement, ObjectName, SqlOption, Statement, Value, }; use super::OverwriteOptions; @@ -36,13 +36,14 @@ mod options { } /// Options or properties extracted from the `WITH` clause of DDLs. -#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct WithOptions { - inner: BTreeMap, + inner: HashMap, + ref_secret: HashMap, } impl std::ops::Deref for WithOptions { - type Target = BTreeMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.inner @@ -60,33 +61,42 @@ impl WithOptions { pub fn new(inner: HashMap) -> Self { Self { inner: inner.into_iter().collect(), + ref_secret: Default::default(), } } - pub fn from_inner(inner: BTreeMap) -> Self { - Self { inner } + pub fn from_inner(inner: HashMap) -> Self { + Self { + inner, + ref_secret: Default::default(), + } } /// Get the reference of the inner map. - pub fn inner(&self) -> &BTreeMap { + pub fn inner(&self) -> &HashMap { &self.inner } - pub fn inner_mut(&mut self) -> &mut BTreeMap { + pub fn inner_mut(&mut self) -> &mut HashMap { &mut self.inner } /// Take the value of the inner map. - pub fn into_inner(self) -> BTreeMap { + pub fn into_inner(self) -> HashMap { self.inner } /// Convert to connector props, remove the key-value pairs used in the top-level. - pub fn into_connector_props(self) -> HashMap { - self.inner + pub fn into_connector_props(self) -> Self { + let inner = self + .inner .into_iter() .filter(|(key, _)| key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY) - .collect() + .collect(); + Self { + inner, + ref_secret: self.ref_secret, + } } /// Parse the retention seconds from the options. @@ -107,7 +117,10 @@ impl WithOptions { }) .collect(); - Self { inner } + Self { + inner, + ref_secret: Default::default(), + } } pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool { @@ -174,9 +187,19 @@ impl TryFrom<&[SqlOption]> for WithOptions { type Error = RwError; fn try_from(options: &[SqlOption]) -> Result { - let mut inner: BTreeMap = BTreeMap::new(); + let mut inner: HashMap = HashMap::new(); + let mut ref_secret: HashMap = HashMap::new(); for option in options { let key = option.name.real_value(); + if let Value::Ref(r) = &option.value { + if ref_secret.insert(key.clone(), r.clone()).is_some() || inner.contains_key(&key) { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; + } let value: String = match option.value.clone() { Value::CstyleEscapedString(s) => s.value, Value::SingleQuotedString(s) => s, @@ -189,7 +212,7 @@ impl TryFrom<&[SqlOption]> for WithOptions { ))) } }; - if inner.insert(key.clone(), value).is_some() { + if inner.insert(key.clone(), value).is_some() || ref_secret.contains_key(&key) { return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( "Duplicated option: {}", key @@ -197,7 +220,7 @@ impl TryFrom<&[SqlOption]> for WithOptions { } } - Ok(Self { inner }) + Ok(Self { inner, ref_secret }) } } diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index b77b751b281e7..2bd2bf256d803 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; use std::sync::LazyLock; use async_trait::async_trait; @@ -75,7 +74,7 @@ impl CloudService for CloudServiceImpl { "unexpected source type, only kafka source is supported", )); } - let mut source_cfg: BTreeMap = req.source_config.into_iter().collect(); + let mut source_cfg = req.source_config.clone(); // if connection_id provided, check whether endpoint service is available and resolve // broker rewrite map currently only support aws privatelink connection if let Some(connection_id_str) = source_cfg.get("connection.id") { @@ -147,7 +146,6 @@ impl CloudService for CloudServiceImpl { } } // try fetch kafka metadata, return error message on failure - let source_cfg: HashMap = source_cfg.into_iter().collect(); let props = ConnectorProperties::extract(source_cfg, false); if let Err(e) = props { return Ok(new_rwc_validate_fail_response( diff --git a/src/prost/build.rs b/src/prost/build.rs index 67284d844cc3e..5f36e26f4bcbc 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -65,7 +65,7 @@ fn main() -> Result<(), Box> { ".monitor_service.StackTraceResponse", ".plan_common.ExternalTableDesc", ".hummock.CompactTask", - ".catalog.StreamSourceInfo", + // ".catalog.StreamSourceInfo", ]; // Build protobuf structs. @@ -99,7 +99,7 @@ fn main() -> Result<(), Box> { // Eq + Hash are for plan nodes to do common sub-plan detection. // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") - .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]") diff --git a/src/sqlparser/src/ast/value.rs b/src/sqlparser/src/ast/value.rs index 2ce52f3c18bf9..79f2a6ebd99ca 100644 --- a/src/sqlparser/src/ast/value.rs +++ b/src/sqlparser/src/ast/value.rs @@ -17,6 +17,8 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use super::ObjectName; + /// Primitive SQL values such as number and string #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -57,6 +59,8 @@ pub enum Value { }, /// `NULL` value Null, + /// name of the reference to secret + Ref(ObjectName), } impl fmt::Display for Value { @@ -111,6 +115,7 @@ impl fmt::Display for Value { Ok(()) } Value::Null => write!(f, "NULL"), + Value::Ref(v) => write!(f, "ref secret {}", v), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index c80c3c145dd96..8c383ad54589a 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3558,6 +3558,10 @@ impl Parser<'_> { Some('\'') => Ok(Value::SingleQuotedString(w.value)), _ => self.expected_at(checkpoint, "A value")?, }, + Keyword::REF => { + self.expect_keyword(Keyword::SECRET)?; + Ok(Value::Ref(self.parse_object_name()?)) + } _ => self.expected_at(checkpoint, "a concrete value"), }, Token::Number(ref n) => Ok(Value::Number(n.clone())),