diff --git a/proto/catalog.proto b/proto/catalog.proto index 9b08528ae123e..6d54903cc2592 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -241,6 +241,7 @@ message ConnectionParams { CONNECTION_TYPE_UNSPECIFIED = 0; CONNECTION_TYPE_KAFKA = 1; CONNECTION_TYPE_ICEBERG = 2; + CONNECTION_TYPE_SCHEMA_REGISTRY = 3; } ConnectionType connection_type = 1; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 414db77bcf5ca..0d4b7b806cda6 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -425,9 +425,9 @@ message CreateConnectionRequest { uint32 schema_id = 3; oneof payload { PrivateLink private_link = 4 [deprecated = true]; - catalog.ConnectionParams connection_params = 7; + catalog.ConnectionParams connection_params = 6; } - uint32 owner_id = 6; + uint32 owner_id = 5; } message CreateConnectionResponse { diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index f90d73f0a77e3..cbfe6979a9ef4 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -48,6 +48,12 @@ fn resolve_create_connection_payload( with_properties: WithOptions, session: &SessionImpl, ) -> Result { + if !with_properties.connection_ref().is_empty() { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Connection reference is not allowed in options in CREATE CONNECTION".to_string(), + ))); + } + let (mut props, secret_refs) = resolve_secret_ref_in_with_options(with_properties, session)?.into_parts(); let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a59..49ef26176e87c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; use std::sync::LazyLock; use anyhow::{anyhow, Context}; use either::Either; use itertools::Itertools; -use maplit::{convert_args, hashmap}; +use maplit::{convert_args, hashmap, hashset}; use pgwire::pg_response::{PgResponse, StatementType}; use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; @@ -59,6 +59,7 @@ use risingwave_connector::source::{ POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; @@ -86,7 +87,8 @@ use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::{ - resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions, + resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option, + resolve_secret_ref_in_with_options, OverwriteOptions, }; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved}; @@ -308,16 +310,36 @@ pub(crate) async fn bind_columns_from_source( const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; let options_with_secret = match with_properties { - Either::Left(options) => resolve_secret_ref_in_with_options(options.clone(), session)?, + Either::Left(options) => { + let (sec_resolve_props, connection_type) = + resolve_connection_ref_and_secret_ref(options.clone(), session)?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + + sec_resolve_props + } Either::Right(options_with_secret) => options_with_secret.clone(), }; let is_kafka: bool = options_with_secret.is_kafka_connector(); - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( + + // todo: need to resolve connection ref for schema registry + let (sec_resolve_props, connection_type) = resolve_connection_ref_and_secret_ref( WithOptions::try_from(format_encode.row_options())?, session, - )? - .into_parts(); + )?; + if !ALLOWED_CONNECTION_SCHEMA_REGISTRY.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_SCHEMA_REGISTRY + )))); + } + + let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( format_encode_options.clone(), @@ -542,11 +564,15 @@ fn bind_columns_from_source_for_cdc( session: &SessionImpl, format_encode: &FormatEncodeOptions, ) -> Result<(Option>, StreamSourceInfo)> { - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(format_encode.row_options())?, - session, - )? - .into_parts(); + let with_options = WithOptions::try_from(format_encode.row_options())?; + if !with_options.connection_ref().is_empty() { + return Err(RwError::from(NotSupported( + "CDC connector does not support connection ref yet".to_string(), + "Explicitly specify the connection in WITH clause".to_string(), + ))); + } + let (format_encode_options, format_encode_secret_refs) = + resolve_secret_ref_in_with_options(with_options, session)?.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( @@ -1049,6 +1075,20 @@ pub(super) fn bind_source_watermark( Ok(watermark_descs) } +static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = LazyLock::new(|| { + hashset! { + PbConnectionType::Kafka, + PbConnectionType::Iceberg, + } +}); + +static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::SchemaRegistry, + } + }); + // TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { @@ -1561,7 +1601,21 @@ pub async fn bind_create_source_or_table_with_connector( let mut with_properties = with_properties; resolve_privatelink_in_with_option(&mut with_properties)?; - let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; + let connector = with_properties.get_connector().unwrap(); + let (with_properties, connection_type) = + resolve_connection_ref_and_secret_ref(with_properties, session)?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + if !connector.eq(connection_type_to_connector(&connection_type)) { + return Err(RwError::from(ProtocolError(format!( + "connector {} and connection type {:?} are not compatible", + connector, connection_type + )))); + } let pk_names = bind_source_pk( &format_encode, @@ -1772,6 +1826,14 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { } } +fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str { + match connection_type { + PbConnectionType::Kafka => KAFKA_CONNECTOR, + PbConnectionType::Iceberg => ICEBERG_CONNECTOR, + _ => unreachable!(), + } +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0c..e002fbfb121cd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -551,9 +551,9 @@ pub(crate) fn gen_create_table_plan( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { - return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference is not allowed in options when creating table without external source".to_string()).into()); + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { + return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_string()).into()); } gen_create_table_plan_without_source( diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 27c527969f9b2..f2ba78a125b93 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -89,10 +89,10 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in options for CREATE TABLE AS".to_string(), + "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_string(), ) .into()); } diff --git a/src/frontend/src/handler/create_view.rs b/src/frontend/src/handler/create_view.rs index 5ad0e8956b967..851c3a4fa89df 100644 --- a/src/frontend/src/handler/create_view.rs +++ b/src/frontend/src/handler/create_view.rs @@ -87,10 +87,11 @@ pub async fn handle_create_view( .collect() }; - let (properties, secret_refs) = properties.into_parts(); - if !secret_refs.is_empty() { + let (properties, secret_refs, connection_refs) = properties.into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in create view options".to_string(), + "Secret reference and Connection reference are not allowed in create view options" + .to_string(), ) .into()); } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 0e44867598e5e..ac64f133d21d2 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -27,13 +27,14 @@ use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; use risingwave_expr::scalar::like::{i_like_default, like_default}; use risingwave_pb::catalog::connection; +use risingwave_pb::secret::SecretRef; use risingwave_sqlparser::ast::{ display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, }; use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; -use crate::catalog::{CatalogError, IndexCatalog}; +use crate::catalog::{CatalogError, IndexCatalog, SecretId}; use crate::error::Result; use crate::handler::HandlerArgs; use crate::session::cursor_manager::SubscriptionCursor; @@ -444,7 +445,14 @@ pub async fn handle_show_object( connection::Info::ConnectionParams(params) => { // todo: check secrets are not exposed // todo: show dep relations - serde_json::to_string(¶ms.get_properties()).unwrap() + let print_secret_ref = |secret_ref: &SecretRef| -> String { + let secret_name = schema.get_secret_by_id(&SecretId::from(secret_ref.secret_id)).map(|s| s.name.as_str()).unwrap(); + format!("SECRET {} AS {}", secret_name, secret_ref.get_ref_as().unwrap().as_str_name()) + }; + let deref_secrets = params.get_secret_refs().iter().map(|(k, v)| (k.clone(), print_secret_ref(v))); + let mut props = params.get_properties().clone(); + props.extend(deref_secrets); + serde_json::to_string(&props).unwrap() } }; ShowConnectionRow { diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 9d61021dab4fe..870680482b2d6 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -15,20 +15,22 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; +use risingwave_common::catalog::ConnectionId; use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; pub use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::secret::secret_ref::PbRefAsType; use risingwave_pb::secret::PbSecretRef; use risingwave_sqlparser::ast::{ - CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, - CreateSubscriptionStatement, SecretRef, SecretRefAsType, SqlOption, Statement, Value, + ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, + CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, Statement, Value, }; use super::OverwriteOptions; -use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::SessionImpl; use crate::Binder; @@ -38,11 +40,14 @@ mod options { pub const RETENTION_SECONDS: &str = "retention_seconds"; } +const CONNECTION_REF_KEY: &str = "profile"; + /// Options or properties extracted from the `WITH` clause of DDLs. #[derive(Default, Clone, Debug, PartialEq, Eq, Hash)] pub struct WithOptions { inner: BTreeMap, - secret_ref: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, } impl std::ops::Deref for WithOptions { @@ -65,12 +70,21 @@ impl WithOptions { Self { inner, secret_ref: Default::default(), + connection_ref: Default::default(), } } /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref. - pub fn new(inner: BTreeMap, secret_ref: BTreeMap) -> Self { - Self { inner, secret_ref } + pub fn new( + inner: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, + ) -> Self { + Self { + inner, + secret_ref, + connection_ref, + } } pub fn inner_mut(&mut self) -> &mut BTreeMap { @@ -78,8 +92,14 @@ impl WithOptions { } /// Take the value of the option map and secret refs. - pub fn into_parts(self) -> (BTreeMap, BTreeMap) { - (self.inner, self.secret_ref) + pub fn into_parts( + self, + ) -> ( + BTreeMap, + BTreeMap, + BTreeMap, + ) { + (self.inner, self.secret_ref, self.connection_ref) } /// Convert to connector props, remove the key-value pairs used in the top-level. @@ -95,6 +115,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref, + connection_ref: self.connection_ref, } } @@ -119,6 +140,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref.clone(), + connection_ref: self.connection_ref.clone(), } } @@ -131,23 +153,26 @@ impl WithOptions { false } - pub fn secret_ref(&self) -> &BTreeMap { + pub fn secret_ref(&self) -> &BTreeMap { &self.secret_ref } - pub fn encode_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; - if secret_ref.is_empty() { - Ok(inner) - } else { - Err(RwError::from(ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in encode options".to_string(), - ))) - } + pub fn secret_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.secret_ref + } + + pub fn connection_ref(&self) -> &BTreeMap { + &self.connection_ref + } + + pub fn connection_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.connection_ref } pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; + let WithOptions { + inner, secret_ref, .. + } = WithOptions::try_from(sql_options)?; if secret_ref.is_empty() { Ok(inner) } else { @@ -158,12 +183,88 @@ impl WithOptions { } } +pub(crate) fn resolve_connection_ref_and_secret_ref( + with_options: WithOptions, + session: &SessionImpl, +) -> RwResult<(WithOptionsSecResolved, PbConnectionType)> { + let db_name: &str = session.database(); + let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts(); + + let mut connection_params = None; + for connection_ref in connection_refs.values() { + // at most one connection ref in the map + connection_params = { + // get connection params from catalog + let (schema_name, connection_name) = Binder::resolve_schema_qualified_name( + db_name, + connection_ref.connection_name.clone(), + )?; + let connection_catalog = + session.get_connection_by_name(schema_name, &connection_name)?; + if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info { + Some(params.clone()) + } else { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Private Link Service has been deprecated. Please create a new connection instead." + .to_string(), + ))); + } + }; + } + + let mut inner_secret_refs = { + let mut resolved_secret_refs = BTreeMap::new(); + for (key, secret_ref) in secret_refs { + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?; + let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?; + let ref_as = match secret_ref.ref_as { + SecretRefAsType::Text => PbRefAsType::Text, + SecretRefAsType::File => PbRefAsType::File, + }; + let pb_secret_ref = PbSecretRef { + secret_id: secret_catalog.id.secret_id(), + ref_as: ref_as.into(), + }; + resolved_secret_refs.insert(key.clone(), pb_secret_ref); + } + resolved_secret_refs + }; + + let mut connection_type = PbConnectionType::Unspecified; + if let Some(connection_params) = connection_params { + connection_type = connection_params.connection_type(); + for (k, v) in connection_params.properties { + if options.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key: {}", + k + )))); + } + } + + for (k, v) in connection_params.secret_refs { + if inner_secret_refs.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key: {}", + k + )))); + } + } + } + + Ok(( + WithOptionsSecResolved::new(options, inner_secret_refs), + connection_type, + )) +} + /// Get the secret id from the name. pub(crate) fn resolve_secret_ref_in_with_options( with_options: WithOptions, session: &SessionImpl, ) -> RwResult { - let (options, secret_refs) = with_options.into_parts(); + let (options, secret_refs, _) = with_options.into_parts(); let mut resolved_secret_refs = BTreeMap::new(); let db_name: &str = session.database(); for (key, secret_ref) in secret_refs { @@ -207,17 +308,40 @@ impl TryFrom<&[SqlOption]> for WithOptions { fn try_from(options: &[SqlOption]) -> Result { let mut inner: BTreeMap = BTreeMap::new(); - let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut connection_ref: BTreeMap = BTreeMap::new(); for option in options { let key = option.name.real_value(); - if let Value::Ref(r) = &option.value { - if secret_ref.insert(key.clone(), r.clone()).is_some() || inner.contains_key(&key) { - return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( - "Duplicated option: {}", - key - )))); + match &option.value { + Value::SecretRef(r) => { + if secret_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; } - continue; + Value::ConnectionRef(r) => { + if key != CONNECTION_REF_KEY { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "expect 'profile' as the key for connection ref, but got: {}", + key + )))); + } + if connection_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; + } + _ => {} } let value: String = match option.value.clone() { Value::CstyleEscapedString(s) => s.value, @@ -239,7 +363,11 @@ impl TryFrom<&[SqlOption]> for WithOptions { } } - Ok(Self { inner, secret_ref }) + Ok(Self { + inner, + secret_ref, + connection_ref, + }) } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 563dc66be4780..c737913d43ad9 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -52,8 +52,8 @@ pub use self::query::{ }; pub use self::statement::*; pub use self::value::{ - CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, SecretRef, - SecretRefAsType, TrimWhereField, Value, + ConnectionRefValue, CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, + SecretRefAsType, SecretRefValue, TrimWhereField, Value, }; pub use crate::ast::ddl::{ AlterIndexOperation, AlterSinkOperation, AlterSourceOperation, AlterSubscriptionOperation, diff --git a/src/sqlparser/src/ast/value.rs b/src/sqlparser/src/ast/value.rs index 2bf8a6fdf3a02..051e0814a3d9f 100644 --- a/src/sqlparser/src/ast/value.rs +++ b/src/sqlparser/src/ast/value.rs @@ -60,7 +60,9 @@ pub enum Value { /// `NULL` value Null, /// name of the reference to secret - Ref(SecretRef), + SecretRef(SecretRefValue), + /// name of the reference to connection + ConnectionRef(ConnectionRefValue), } impl fmt::Display for Value { @@ -115,7 +117,8 @@ impl fmt::Display for Value { Ok(()) } Value::Null => write!(f, "NULL"), - Value::Ref(v) => write!(f, "secret {}", v), + Value::SecretRef(v) => write!(f, "secret {}", v), + Value::ConnectionRef(v) => write!(f, "connection {}", v), } } } @@ -240,12 +243,12 @@ impl fmt::Display for JsonPredicateType { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct SecretRef { +pub struct SecretRefValue { pub secret_name: ObjectName, pub ref_as: SecretRefAsType, } -impl fmt::Display for SecretRef { +impl fmt::Display for SecretRefValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.ref_as { SecretRefAsType::Text => write!(f, "{}", self.secret_name), @@ -260,3 +263,15 @@ pub enum SecretRefAsType { Text, File, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionRefValue { + pub connection_name: ObjectName, +} + +impl fmt::Display for ConnectionRefValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.connection_name) + } +} diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d5582f31a64de..d16c6d12116a8 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3641,11 +3641,15 @@ impl Parser<'_> { } else { SecretRefAsType::Text }; - Ok(Value::Ref(SecretRef { + Ok(Value::SecretRef(SecretRefValue { secret_name, ref_as, })) } + Keyword::CONNECTION => { + let connection_name = self.parse_object_name()?; + Ok(Value::ConnectionRef(ConnectionRefValue { connection_name })) + } _ => self.expected_at(checkpoint, "a concrete value"), }, Token::Number(ref n) => Ok(Value::Number(n.clone())),