From 449f09be1b86221b6f3f16e21161b4374079c167 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 17 Oct 2024 17:36:51 +0800 Subject: [PATCH 01/10] remove private link related connection Signed-off-by: tabVersion --- src/frontend/src/handler/create_connection.rs | 64 +--- src/meta/model_v2/src/connection.rs | 2 + src/meta/node/src/server.rs | 14 - src/meta/service/src/cloud_service.rs | 196 ---------- src/meta/service/src/ddl_service.rs | 98 +---- src/meta/service/src/lib.rs | 1 - src/meta/src/controller/catalog.rs | 11 +- src/meta/src/error.rs | 15 +- src/meta/src/rpc/cloud_provider.rs | 337 ------------------ src/meta/src/rpc/ddl_controller.rs | 22 -- src/meta/src/rpc/ddl_controller_v2.rs | 22 +- src/meta/src/rpc/mod.rs | 1 - 12 files changed, 41 insertions(+), 742 deletions(-) delete mode 100644 src/meta/src/rpc/cloud_provider.rs diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 987f0e9fdd89..577b643097a9 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -16,23 +16,16 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::ddl_service::create_connection_request; use risingwave_sqlparser::ast::CreateConnectionStatement; use super::RwPgResponse; use crate::binder::Binder; use crate::error::ErrorCode::ProtocolError; -use crate::error::{Result, RwError}; +use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; -pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider"; -pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name"; -pub(crate) const CONNECTION_TAGS_PROP: &str = "tags"; - -pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing -pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws"; #[inline(always)] fn get_connection_property_required( @@ -48,58 +41,19 @@ fn get_connection_property_required( ))) }) } - -fn resolve_private_link_properties( - with_properties: &BTreeMap, -) -> Result { - let provider = - match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str() - { - CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock, - CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws, - provider => { - return Err(RwError::from(ProtocolError(format!( - "Unsupported privatelink provider {}", - provider - )))); - } - }; - match provider { - PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name: String::new(), - tags: None, - }), - PrivateLinkProvider::Aws => { - let service_name = - get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?; - Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name, - tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(), - }) - } - PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError( - "Privatelink provider unspecified".to_string(), - ))), - } -} - fn resolve_create_connection_payload( with_properties: &BTreeMap, ) -> Result { let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?; - let create_connection_payload = match connection_type.as_str() { - PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink( - resolve_private_link_properties(with_properties)?, - ), - _ => { - return Err(RwError::from(ProtocolError(format!( - "Connection type \"{connection_type}\" is not supported" - )))); - } + return match connection_type.as_str() { + PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal".to_string(), + ))), + _ => Err(RwError::from(ProtocolError(format!( + "Connection type \"{connection_type}\" is not supported" + )))), }; - Ok(create_connection_payload) } pub async fn handle_create_connection( diff --git a/src/meta/model_v2/src/connection.rs b/src/meta/model_v2/src/connection.rs index a6cfa4aefb58..dce0daa462fc 100644 --- a/src/meta/model_v2/src/connection.rs +++ b/src/meta/model_v2/src/connection.rs @@ -26,6 +26,8 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub connection_id: ConnectionId, pub name: String, + + // todo: Private link service has been deprecated, consider using a new field for the connection info pub info: PrivateLinkService, } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index b556c4ca3452..b70584c437a6 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -35,7 +35,6 @@ use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_service::backup_service::BackupServiceImpl; -use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; @@ -55,7 +54,6 @@ use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl; use risingwave_meta_service::user_service::UserServiceImpl; use risingwave_meta_service::AddressInfo; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; -use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; @@ -86,7 +84,6 @@ use crate::controller::SqlMetaStore; use crate::hummock::HummockManager; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient}; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, @@ -531,17 +528,8 @@ pub async fn start_service_as_election_leader( compactor_manager.clone(), )); - let mut aws_cli = None; - if let Some(my_vpc_id) = &env.opts.vpc_id - && let Some(security_group_id) = &env.opts.security_group_id - { - let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await; - aws_cli = Some(cli); - } - let ddl_srv = DdlServiceImpl::new( env.clone(), - aws_cli.clone(), metadata_manager.clone(), stream_manager.clone(), source_manager.clone(), @@ -586,7 +574,6 @@ pub async fn start_service_as_election_leader( let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); - let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); @@ -712,7 +699,6 @@ pub async fn start_service_as_election_leader( .add_service(SessionParamServiceServer::new(session_params_srv)) .add_service(TelemetryInfoServiceServer::new(telemetry_srv)) .add_service(ServingServiceServer::new(serving_srv)) - .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) .add_service(EventLogServiceServer::new(event_log_srv)) .add_service(ClusterLimitServiceServer::new(cluster_limit_srv)); diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index c609d202b0d6..8b137891791f 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -1,197 +1 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::collections::BTreeMap; -use std::sync::LazyLock; - -use async_trait::async_trait; -use regex::Regex; -use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; -use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, -}; -use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model_v2::ConnectionId; -use risingwave_pb::catalog::connection::Info::PrivateLinkService; -use risingwave_pb::cloud_service::cloud_service_server::CloudService; -use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; -use risingwave_pb::cloud_service::{ - RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType, -}; -use thiserror_ext::AsReport; -use tonic::{Request, Response, Status}; - -use crate::rpc::cloud_provider::AwsEc2Client; - -pub struct CloudServiceImpl { - metadata_manager: MetadataManager, - aws_client: Option, -} - -impl CloudServiceImpl { - pub fn new(metadata_manager: MetadataManager, aws_client: Option) -> Self { - Self { - metadata_manager, - aws_client, - } - } -} - -#[inline(always)] -fn new_rwc_validate_fail_response( - error_type: ErrorType, - error_message: String, -) -> Response { - Response::new(RwCloudValidateSourceResponse { - ok: false, - error: Some(Error { - error_type: error_type.into(), - error_message, - }), - }) -} - -#[async_trait] -impl CloudService for CloudServiceImpl { - async fn rw_cloud_validate_source( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - if req.source_type() != SourceType::Kafka { - return Err(Status::invalid_argument( - "unexpected source type, only kafka source is supported", - )); - } - let mut source_cfg: BTreeMap = req.source_config.into_iter().collect(); - // if connection_id provided, check whether endpoint service is available and resolve - // broker rewrite map currently only support aws privatelink connection - if let Some(connection_id_str) = source_cfg.get("connection.id") { - let connection_id = connection_id_str.parse::().map_err(|e| { - Status::invalid_argument(format!( - "connection.id is not an integer: {}", - e.as_report() - )) - })?; - - let connection = self - .metadata_manager - .catalog_controller - .get_connection_by_id(connection_id) - .await; - - if let Err(e) = connection { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkConnectionNotFound, - e.to_report_string(), - )); - } - if let Some(PrivateLinkService(service)) = connection.unwrap().info { - if self.aws_client.is_none() { - return Ok(new_rwc_validate_fail_response( - ErrorType::AwsClientNotConfigured, - "AWS client is not configured".to_string(), - )); - } - let cli = self.aws_client.as_ref().unwrap(); - let privatelink_status = cli - .is_vpc_endpoint_ready(service.endpoint_id.as_str()) - .await; - match privatelink_status { - Err(e) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - e.to_report_string(), - )); - } - Ok(false) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - format!("Private link endpoint {} is not ready", service.endpoint_id,), - )); - } - _ => (), - }; - if let Err(e) = - insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None) - { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - e.to_report_string(), - )); - } - } else { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - format!("connection {} has no info available", connection_id), - )); - } - } - - // XXX: We can't use secret in cloud validate source. - let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg); - - // try fetch kafka metadata, return error message on failure - let props = ConnectorProperties::extract(source_cfg, false); - if let Err(e) = props { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaInvalidProperties, - e.to_report_string(), - )); - }; - - async fn new_enumerator( - props: P, - ) -> ConnectorResult { - P::SplitEnumerator::new(props, SourceEnumeratorContext::dummy().into()).await - } - - dispatch_source_prop!(props.unwrap(), props, { - let enumerator = new_enumerator(*props).await; - if let Err(e) = enumerator { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaInvalidProperties, - e.to_report_string(), - )); - } - if let Err(e) = enumerator.unwrap().list_splits().await { - let error_message = e.to_report_string(); - if error_message.contains("BrokerTransportFailure") { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaBrokerUnreachable, - e.to_report_string(), - )); - } - static TOPIC_NOT_FOUND: LazyLock = - LazyLock::new(|| Regex::new(r"topic .* not found").unwrap()); - if TOPIC_NOT_FOUND.is_match(error_message.as_str()) { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaTopicNotFound, - e.to_report_string(), - )); - } - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaOther, - e.to_report_string(), - )); - } - }); - Ok(Response::new(RwCloudValidateSourceResponse { - ok: true, - error: None, - })) - } -} diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1578813e2ead..7a5d0f315c7b 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -22,15 +22,12 @@ use risingwave_common::catalog::ColumnCatalog; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; +use risingwave_meta::error::MetaErrorInner; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::connection::private_link_service::{ - PbPrivateLinkProvider, PrivateLinkProvider, -}; -use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret, Table}; +use risingwave_pb::catalog::{connection, Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -44,7 +41,6 @@ use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::ddl_controller::{ DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, }; @@ -58,7 +54,6 @@ pub struct DdlServiceImpl { metadata_manager: MetadataManager, sink_manager: SinkCoordinatorManager, ddl_controller: DdlController, - aws_client: Arc>, meta_metrics: Arc, } @@ -66,7 +61,6 @@ impl DdlServiceImpl { #[allow(clippy::too_many_arguments)] pub async fn new( env: MetaSrvEnv, - aws_client: Option, metadata_manager: MetadataManager, stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, @@ -74,22 +68,19 @@ impl DdlServiceImpl { sink_manager: SinkCoordinatorManager, meta_metrics: Arc, ) -> Self { - let aws_cli_ref = Arc::new(aws_client); let ddl_controller = DdlController::new( env.clone(), metadata_manager.clone(), stream_manager, source_manager, barrier_manager, - aws_cli_ref.clone(), ) .await; Self { env, metadata_manager, - ddl_controller, - aws_client: aws_cli_ref, sink_manager, + ddl_controller, meta_metrics, } } @@ -747,64 +738,11 @@ impl DdlService for DdlServiceImpl { return Err(Status::invalid_argument("request is empty")); } - match req.payload.unwrap() { - create_connection_request::Payload::PrivateLink(link) => { - // currently we only support AWS - let private_link_svc = match link.get_provider()? { - PbPrivateLinkProvider::Mock => PbPrivateLinkService { - provider: link.provider, - service_name: String::new(), - endpoint_id: String::new(), - endpoint_dns_name: String::new(), - dns_entries: HashMap::new(), - }, - PbPrivateLinkProvider::Aws => { - if let Some(aws_cli) = self.aws_client.as_ref() { - let tags_env = self - .env - .opts - .privatelink_endpoint_default_tags - .as_ref() - .map(|tags| { - tags.iter() - .map(|(key, val)| (key.as_str(), val.as_str())) - .collect() - }); - aws_cli - .create_aws_private_link( - &link.service_name, - link.tags.as_deref(), - tags_env, - ) - .await? - } else { - return Err(Status::from(MetaError::unavailable( - "AWS client is not configured", - ))); - } - } - PbPrivateLinkProvider::Unspecified => { - return Err(Status::invalid_argument("Privatelink provider unspecified")); - } - }; - let connection = Connection { - id: 0, - schema_id: req.schema_id, - database_id: req.database_id, - name: req.name, - owner: req.owner_id, - info: Some(connection::Info::PrivateLinkService(private_link_svc)), - }; - - // save private link info to catalog - let version = self - .ddl_controller - .run_command(DdlCommand::CreateConnection(connection)) - .await?; - - Ok(Response::new(CreateConnectionResponse { version })) - } - } + return match req.payload.unwrap() { + create_connection_request::Payload::PrivateLink(_) => Err(Status::unavailable( + "Private Link is deprecated, please use Cloud Portal", + )), + }; } async fn list_connections( @@ -1095,21 +1033,11 @@ impl DdlServiceImpl { .catalog_controller .get_connection_by_id(connection_id as _) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &connection.info { - // skip all checks for mock connection - if svc.get_provider()? == PrivateLinkProvider::Mock { - return Ok(()); - } - - // check whether private link is ready - if let Some(aws_cli) = self.aws_client.as_ref() { - if !aws_cli.is_vpc_endpoint_ready(&svc.endpoint_id).await? { - return Err(MetaError::from(anyhow!( - "Private link endpoint {} is not ready", - svc.endpoint_id - ))); - } - } + if let Some(connection::Info::PrivateLinkService(_)) = &connection.info { + return Err(MetaError::from(MetaErrorInner::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal".to_string(), + ))); } Ok(()) } diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 2e327dc47a59..68a5e4c4a762 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -19,7 +19,6 @@ use risingwave_meta::*; pub mod backup_service; -pub mod cloud_service; pub mod cluster_limit_service; pub mod cluster_service; pub mod ddl_service; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index e90bfb3b1270..eb54da28fc1e 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -32,9 +32,9 @@ use risingwave_meta_model_v2::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, - FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SecretId, SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, - TableId, UserId, ViewId, + FunctionId, I32Array, IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkId, + SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, + ViewId, }; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; @@ -109,7 +109,8 @@ pub struct ReleaseContext { /// Dropped source list, need to unregister from source manager. pub(crate) source_ids: Vec, /// Dropped connection list, need to delete from vpc endpoints. - pub(crate) connections: Vec, + #[allow(dead_code)] + pub(crate) connections: Vec, /// Dropped fragments that are fetching data from the target source. pub(crate) source_fragments: HashMap>, @@ -367,7 +368,7 @@ impl CatalogController { .all(&txn) .await? .into_iter() - .map(|conn| conn.info) + .map(|conn| conn.connection_id) .collect_vec(); // Find affect users with privileges on the database and the objects in the database. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 78051e28e7cb..b049ef9e99ef 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -116,9 +116,6 @@ pub enum MetaErrorInner { SinkError, ), - #[error("AWS SDK error: {0}")] - Aws(#[source] BoxedError), - #[error(transparent)] Internal( #[from] @@ -132,6 +129,9 @@ pub enum MetaErrorInner { #[error("Integrity check failed")] IntegrityCheckFailed, + + #[error("{0} has been deprecated, please use {1} instead.")] + Deprecated(String, String), } impl MetaError { @@ -156,15 +156,6 @@ impl MetaError { } } -impl From> for MetaError -where - E: std::error::Error + Sync + Send + 'static, -{ - fn from(e: aws_sdk_ec2::error::SdkError) -> Self { - MetaErrorInner::Aws(e.into()).into() - } -} - impl From for tonic::Status { fn from(err: MetaError) -> Self { use tonic::Code; diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs deleted file mode 100644 index fce20d5eea09..000000000000 --- a/src/meta/src/rpc/cloud_provider.rs +++ /dev/null @@ -1,337 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use anyhow::anyhow; -use aws_config::retry::RetryConfig; -use aws_sdk_ec2::error::ProvideErrorMetadata; -use aws_sdk_ec2::types::{Filter, ResourceType, State, Tag, TagSpecification, VpcEndpointType}; -use itertools::Itertools; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; - -use crate::{MetaError, MetaResult}; - -#[derive(Clone)] -pub struct AwsEc2Client { - client: aws_sdk_ec2::Client, - /// `vpc_id`: The VPC of the running RisingWave instance - vpc_id: String, - security_group_id: String, -} - -impl AwsEc2Client { - pub async fn new(vpc_id: &str, security_group_id: &str) -> Self { - let sdk_config = aws_config::from_env() - .retry_config(RetryConfig::standard().with_max_attempts(4)) - .load() - .await; - let client = aws_sdk_ec2::Client::new(&sdk_config); - - Self { - client, - vpc_id: vpc_id.to_string(), - security_group_id: security_group_id.to_string(), - } - } - - pub async fn delete_vpc_endpoint(&self, vpc_endpoint_id: &str) -> MetaResult<()> { - let output = self - .client - .delete_vpc_endpoints() - .vpc_endpoint_ids(vpc_endpoint_id) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - if !output.unsuccessful().is_empty() { - return Err(MetaError::from(anyhow!( - "Failed to delete VPC endpoint {}, error: {:?}", - vpc_endpoint_id, - output.unsuccessful() - ))); - } - Ok(()) - } - - /// `service_name`: The name of the endpoint service we want to access - /// `tags_user_str`: The tags specified in with clause of `create connection` - /// `tags_env`: The default tags specified in env var `RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS` - pub async fn create_aws_private_link( - &self, - service_name: &str, - tags_user_str: Option<&str>, - tags_env: Option>, - ) -> MetaResult { - // fetch the AZs of the endpoint service - let service_azs = self.get_endpoint_service_az_names(service_name).await?; - let subnet_and_azs = self.describe_subnets(&self.vpc_id, &service_azs).await?; - - let subnet_ids: Vec = subnet_and_azs.iter().map(|(id, _, _)| id.clone()).collect(); - let az_to_azid_map: HashMap = subnet_and_azs - .into_iter() - .map(|(_, az, az_id)| (az, az_id)) - .collect(); - - let tags_vec = match tags_user_str { - Some(tags_user_str) => { - let mut tags_user = tags_user_str - .split(',') - .map(|s| { - s.split_once('=').ok_or_else(|| { - MetaError::invalid_parameter("Failed to parse `tags` parameter") - }) - }) - .collect::>>()?; - match tags_env { - Some(tags_env) => { - tags_user.extend(tags_env); - Some(tags_user) - } - None => Some(tags_user), - } - } - None => tags_env, - }; - - let (endpoint_id, endpoint_dns_names) = self - .create_vpc_endpoint( - &self.vpc_id, - service_name, - &self.security_group_id, - &subnet_ids, - tags_vec, - ) - .await?; - - // The number of returned DNS names may not equal to the input AZs, - // because some AZs may not have a subnet in the RW VPC - let mut azid_to_dns_map = HashMap::new(); - if endpoint_dns_names.first().is_none() { - return Err(MetaError::from(anyhow!( - "No DNS name returned for the endpoint" - ))); - } - - // The first dns name doesn't has AZ info - let endpoint_dns_name = endpoint_dns_names.first().unwrap().clone(); - for dns_name in &endpoint_dns_names { - for az in az_to_azid_map.keys() { - if dns_name.contains(az) { - azid_to_dns_map - .insert(az_to_azid_map.get(az).unwrap().clone(), dns_name.clone()); - break; - } - } - } - - Ok(PrivateLinkService { - provider: PrivateLinkProvider::Aws.into(), - service_name: service_name.to_string(), - endpoint_id, - dns_entries: azid_to_dns_map, - endpoint_dns_name, - }) - } - - pub async fn is_vpc_endpoint_ready(&self, vpc_endpoint_id: &str) -> MetaResult { - let mut is_ready = false; - let filter = Filter::builder() - .name("vpc-endpoint-id") - .values(vpc_endpoint_id) - .build(); - let output = self - .client - .describe_vpc_endpoints() - .set_filters(Some(vec![filter])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.vpc_endpoints { - Some(endpoints) => { - let endpoint = endpoints - .into_iter() - .exactly_one() - .map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?; - if let Some(state) = endpoint.state { - match state { - State::Available => { - is_ready = true; - } - // forward-compatible with protocol change - other => { - is_ready = other.as_str().eq_ignore_ascii_case("available"); - } - } - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint found with the ID {}", - vpc_endpoint_id - ))); - } - } - Ok(is_ready) - } - - async fn get_endpoint_service_az_names(&self, service_name: &str) -> MetaResult> { - let mut service_azs = Vec::new(); - let output = self - .client - .describe_vpc_endpoint_services() - .set_service_names(Some(vec![service_name.to_string()])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.service_details { - Some(details) => { - let detail = details.into_iter().exactly_one().map_err(|_| { - anyhow!("More than one VPC endpoint service found with the same name") - })?; - if let Some(azs) = detail.availability_zones { - service_azs.extend(azs.into_iter()); - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint service found with the name {}", - service_name - ))); - } - } - Ok(service_azs) - } - - async fn describe_subnets( - &self, - vpc_id: &str, - az_names: &[String], - ) -> MetaResult> { - let vpc_filter = Filter::builder().name("vpc-id").values(vpc_id).build(); - let az_filter = Filter::builder() - .name("availability-zone") - .set_values(Some(Vec::from(az_names))) - .build(); - let output = self - .client - .describe_subnets() - .set_filters(Some(vec![vpc_filter, az_filter])) - .send() - .await - .map_err(|e| { - anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id")) - })?; - - let subnets = output - .subnets - .unwrap_or_default() - .into_iter() - .unique_by(|s| s.availability_zone().unwrap_or_default().to_string()) - .map(|s| { - ( - s.subnet_id.unwrap_or_default(), - s.availability_zone.unwrap_or_default(), - s.availability_zone_id.unwrap_or_default(), - ) - }) - .collect(); - Ok(subnets) - } - - async fn create_vpc_endpoint( - &self, - vpc_id: &str, - service_name: &str, - security_group_id: &str, - subnet_ids: &[String], - tags_vec: Option>, - ) -> MetaResult<(String, Vec)> { - let tag_spec = match tags_vec { - Some(tags_vec) => { - let tags = tags_vec - .into_iter() - .map(|(tag_key, tag_val)| { - Tag::builder() - .set_key(Some(tag_key.to_string())) - .set_value(Some(tag_val.to_string())) - .build() - }) - .collect(); - Some(vec![TagSpecification::builder() - .set_resource_type(Some(ResourceType::VpcEndpoint)) - .set_tags(Some(tags)) - .build()]) - } - None => None, - }; - - let output = self - .client - .create_vpc_endpoint() - .vpc_endpoint_type(VpcEndpointType::Interface) - .vpc_id(vpc_id) - .security_group_ids(security_group_id) - .service_name(service_name) - .set_subnet_ids(Some(subnet_ids.to_owned())) - .set_tag_specifications(tag_spec) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to create vpc endpoint: vpc_id {vpc_id}, \ - service_name {service_name}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - let endpoint = output.vpc_endpoint().unwrap(); - let mut dns_names = Vec::new(); - - endpoint.dns_entries().iter().for_each(|e| { - if let Some(dns_name) = e.dns_name() { - dns_names.push(dns_name.to_string()); - } - }); - - Ok(( - endpoint.vpc_endpoint_id().unwrap_or_default().to_string(), - dns_names, - )) - } -} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 0cc4e82969c2..7fb0d46d4eec 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -43,8 +43,6 @@ use risingwave_meta_model_v2::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; -use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -66,7 +64,6 @@ use risingwave_pb::stream_plan::{ use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; @@ -76,7 +73,6 @@ use crate::manager::{ DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ create_source_worker_handle, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -185,7 +181,6 @@ pub struct DdlController { pub(crate) source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, // The semaphore is used to limit the number of concurrent streaming job creation. pub(crate) creating_streaming_job_permits: Arc, } @@ -255,7 +250,6 @@ impl DdlController { stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, ) -> Self { let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await); Self { @@ -264,7 +258,6 @@ impl DdlController { stream_manager, source_manager, barrier_manager, - aws_client, creating_streaming_job_permits, } } @@ -548,21 +541,6 @@ impl DdlController { .await } - pub(crate) async fn delete_vpc_endpoint(&self, svc: &PrivateLinkService) -> MetaResult<()> { - // delete AWS vpc endpoint - if svc.get_provider()? == PbPrivateLinkProvider::Aws { - if let Some(aws_cli) = self.aws_client.as_ref() { - aws_cli.delete_vpc_endpoint(&svc.endpoint_id).await?; - } else { - warn!( - "AWS client is not initialized, skip deleting vpc endpoint {}", - svc.endpoint_id - ); - } - } - Ok(()) - } - async fn create_subscription( &self, mut subscription: Subscription, diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 5ab5c0d3ad00..0d6b268cf8ce 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -25,13 +25,14 @@ use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use thiserror_ext::AsReport; use crate::controller::catalog::ReleaseContext; +use crate::error::MetaErrorInner; use crate::manager::{NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION}; use crate::model::StreamContext; use crate::rpc::ddl_controller::{ fill_table_stream_graph_info, DdlController, DropMode, ReplaceTableInfo, }; use crate::stream::{validate_sink, StreamFragmentGraph}; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; impl DdlController { /// For [`CreateType::Foreground`], the function will only return after backfilling finishes @@ -273,8 +274,11 @@ impl DdlController { .catalog_controller .drop_connection(object_id) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { - self.delete_vpc_endpoint(svc).await?; + if let Some(connection::Info::PrivateLinkService(_)) = &conn.info { + return Err(MetaError::from(MetaErrorInner::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal".to_string(), + ))); } return Ok(version); } @@ -386,22 +390,12 @@ impl DdlController { streaming_job_ids, state_table_ids, source_ids, - connections, source_fragments, removed_actors, removed_fragments, + .. } = release_ctx; - // delete vpc endpoints. - for conn in connections { - let _ = self - .delete_vpc_endpoint(&conn.to_protobuf()) - .await - .inspect_err(|err| { - tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); - }); - } - // unregister sources. self.source_manager .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 09fbf7e12f48..821b9befe1ef 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cloud_provider; pub mod ddl_controller; mod ddl_controller_v2; pub mod election; From 03b47bbf439a796c1acb5d84cb4c7b808f4ccc1b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 17 Oct 2024 17:47:35 +0800 Subject: [PATCH 02/10] fix Signed-off-by: tabVersion --- src/meta/service/src/cloud_service.rs | 1 - 1 file changed, 1 deletion(-) delete mode 100644 src/meta/service/src/cloud_service.rs diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs deleted file mode 100644 index 8b137891791f..000000000000 --- a/src/meta/service/src/cloud_service.rs +++ /dev/null @@ -1 +0,0 @@ - From de10c928a69a5b2e5aa21d22bedb6795dff89bc9 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 18 Oct 2024 17:13:22 +0800 Subject: [PATCH 03/10] fix Signed-off-by: tabVersion --- src/meta/src/rpc/ddl_controller.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d2638a862075..4d67af3b5ddf 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -46,8 +46,8 @@ use risingwave_meta_model::{ use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - connection, Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, - Schema, Secret, Sink, Source, Subscription, Table, View, + Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, + Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -1139,14 +1139,11 @@ impl DdlController { .await; } ObjectType::Connection => { - let (version, conn) = self + let (version, _conn) = self .metadata_manager .catalog_controller .drop_connection(object_id) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { - self.delete_vpc_endpoint(svc).await?; - } return Ok(version); } _ => { @@ -1257,22 +1254,12 @@ impl DdlController { streaming_job_ids, state_table_ids, source_ids, - connections, source_fragments, removed_actors, removed_fragments, + .. } = release_ctx; - // delete vpc endpoints. - for conn in connections { - let _ = self - .delete_vpc_endpoint(&conn.to_protobuf()) - .await - .inspect_err(|err| { - tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); - }); - } - // unregister sources. self.source_manager .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) From 45aba097d2a0e190792bb38bcafee083ec9fc0d0 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 21 Oct 2024 17:45:25 +0800 Subject: [PATCH 04/10] remove logic in kafka private link Signed-off-by: tabVersion --- .../src/source/kafka/private_link.rs | 1 - src/frontend/src/handler/create_sink.rs | 13 +++------- src/frontend/src/handler/create_source.rs | 5 ++-- src/frontend/src/utils/with_options.rs | 26 ++----------------- 4 files changed, 7 insertions(+), 38 deletions(-) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 6aeebde87b51..3f6f1b8e32da 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -30,7 +30,6 @@ use crate::error::ConnectorResult; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; -pub const CONNECTION_NAME_KEY: &str = "connection.name"; #[derive(Debug)] pub(super) enum PrivateLinkContextRole { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index fb35c5efc2e9..03ff662d3096 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -22,9 +22,7 @@ use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::{ - ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId, -}; +use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; @@ -92,12 +90,7 @@ pub async fn gen_sink_plan( let mut with_options = handler_args.with_options.clone(); - let connection_id = { - let conn_id = - resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; - conn_id.map(ConnectionId) - }; - + resolve_privatelink_in_with_option(&mut with_options)?; let mut resolved_with_options = resolve_secret_ref_in_with_options(with_options, session)?; let partition_info = get_partition_compute_info(&resolved_with_options).await?; @@ -266,7 +259,7 @@ pub async fn gen_sink_plan( SchemaId::new(sink_schema_id), DatabaseId::new(sink_database_id), UserId::new(session.user_id()), - connection_id, + None, // deprecated: private link connection id dependent_relations.into_iter().collect_vec(), ); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f168f7d0de3f..3ea41dcf75b2 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1555,8 +1555,7 @@ pub async fn bind_create_source_or_table_with_connector( // resolve privatelink connection for Kafka let mut with_properties = with_properties; - let connection_id = - resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?; + resolve_privatelink_in_with_option(&mut with_properties)?; let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; @@ -1632,7 +1631,7 @@ pub async fn bind_create_source_or_table_with_connector( watermark_descs, associated_table_id, definition, - connection_id, + connection_id: None, // deprecated: private link connection id created_at_epoch: None, initialized_at_epoch: None, version: INITIAL_SOURCE_VERSION_ID, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index e306103c02e3..2b6599dcc9ab 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use risingwave_connector::source::kafka::private_link::{ - insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, + insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; pub use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::WithPropertiesExt; @@ -186,8 +186,6 @@ pub(crate) fn resolve_secret_ref_in_with_options( pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, - schema_name: &Option, - session: &SessionImpl, ) -> RwResult> { let is_kafka = with_options.is_kafka_connector(); let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY); @@ -201,28 +199,8 @@ pub(crate) fn resolve_privatelink_in_with_option( } insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint)) .map_err(RwError::from)?; - return Ok(None); } - - let connection_name = with_options - .remove(CONNECTION_NAME_KEY) - .map(|s| s.to_lowercase()); - let connection_id = match connection_name { - Some(connection_name) => { - let connection = session - .get_connection_by_name(schema_name.clone(), &connection_name) - .map_err(|_| ErrorCode::ItemNotFound(connection_name))?; - if !is_kafka { - return Err(RwError::from(ErrorCode::ProtocolError( - "Connection is only supported in kafka connector".to_string(), - ))); - } - resolve_private_link_connection(&connection, with_options.inner_mut())?; - Some(connection.id) - } - None => None, - }; - Ok(connection_id) + Ok(None) } impl TryFrom<&[SqlOption]> for WithOptions { From 192c838f5fec80c2b0cab495b9af42e7d44cdabe Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 15:21:29 +0800 Subject: [PATCH 05/10] revert cloud service Signed-off-by: tabVersion --- .../src/catalog/connection_catalog.rs | 29 ----- src/frontend/src/utils/with_options.rs | 1 - src/meta/node/src/server.rs | 4 + src/meta/service/src/cloud_service.rs | 121 ++++++++++++++++++ src/meta/service/src/lib.rs | 1 + 5 files changed, 126 insertions(+), 30 deletions(-) diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 54e1210979fe..03b2ff4203c5 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -12,18 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; -use std::sync::Arc; - -use anyhow::anyhow; -use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; -use risingwave_connector::WithPropertiesExt; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::catalog::connection::Info; use risingwave_pb::catalog::{connection, PbConnection}; use crate::catalog::{ConnectionId, OwnedByUserCatalog}; -use crate::error::{Result, RwError}; use crate::user::UserId; #[derive(Clone, Debug, PartialEq)] @@ -64,24 +56,3 @@ impl OwnedByUserCatalog for ConnectionCatalog { self.owner } } - -pub(crate) fn resolve_private_link_connection( - connection: &Arc, - properties: &mut BTreeMap, -) -> Result<()> { - #[allow(irrefutable_let_patterns)] - if let connection::Info::PrivateLinkService(svc) = &connection.info { - if !properties.is_kafka_connector() { - return Err(RwError::from(anyhow!( - "Private link is only supported for Kafka connector" - ))); - } - // skip all checks for mock connection - if svc.get_provider()? == PrivateLinkProvider::Mock { - return Ok(()); - } - insert_privatelink_broker_rewrite_map(properties, Some(svc), None) - .map_err(RwError::from)?; - } - Ok(()) -} diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 2b6599dcc9ab..9d61021dab4f 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -28,7 +28,6 @@ use risingwave_sqlparser::ast::{ }; use super::OverwriteOptions; -use crate::catalog::connection_catalog::resolve_private_link_connection; use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::SessionImpl; diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index b70584c437a6..bd79226b2f5c 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -35,6 +35,7 @@ use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_service::backup_service::BackupServiceImpl; +use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; @@ -54,6 +55,7 @@ use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl; use risingwave_meta_service::user_service::UserServiceImpl; use risingwave_meta_service::AddressInfo; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; +use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; @@ -574,6 +576,7 @@ pub async fn start_service_as_election_leader( let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); + let cloud_srv = CloudServiceImpl::new(); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); @@ -692,6 +695,7 @@ pub async fn start_service_as_election_leader( .add_service(MetaMemberServiceServer::new(meta_member_srv)) .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX)) .add_service(UserServiceServer::new(user_srv)) + .add_service(CloudServiceServer::new(cloud_srv)) .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX)) .add_service(HealthServer::new(health_srv)) .add_service(BackupServiceServer::new(backup_srv)) diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index e69de29bb2d1..553a8189116c 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -0,0 +1,121 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::LazyLock; + +use async_trait::async_trait; +use regex::Regex; +use risingwave_connector::error::ConnectorResult; +use risingwave_connector::source::{ + ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, +}; +use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; +use risingwave_pb::cloud_service::cloud_service_server::CloudService; +use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; +use risingwave_pb::cloud_service::{ + RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType, +}; +use thiserror_ext::AsReport; +use tonic::{Request, Response, Status}; +pub struct CloudServiceImpl {} + +impl CloudServiceImpl { + pub fn new() -> Self { + Self {} + } +} + +#[inline(always)] +fn new_rwc_validate_fail_response( + error_type: ErrorType, + error_message: String, +) -> Response { + Response::new(RwCloudValidateSourceResponse { + ok: false, + error: Some(Error { + error_type: error_type.into(), + error_message, + }), + }) +} + +#[async_trait] +impl CloudService for CloudServiceImpl { + async fn rw_cloud_validate_source( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.source_type() != SourceType::Kafka { + return Err(Status::invalid_argument( + "unexpected source type, only kafka source is supported", + )); + } + let source_cfg: BTreeMap = req.source_config.into_iter().collect(); + + // XXX: We can't use secret in cloud validate source. + let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg); + + // try fetch kafka metadata, return error message on failure + let props = ConnectorProperties::extract(source_cfg, false); + if let Err(e) = props { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaInvalidProperties, + e.to_report_string(), + )); + }; + + async fn new_enumerator( + props: P, + ) -> ConnectorResult { + P::SplitEnumerator::new(props, SourceEnumeratorContext::dummy().into()).await + } + + dispatch_source_prop!(props.unwrap(), props, { + let enumerator = new_enumerator(*props).await; + if let Err(e) = enumerator { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaInvalidProperties, + e.to_report_string(), + )); + } + if let Err(e) = enumerator.unwrap().list_splits().await { + let error_message = e.to_report_string(); + if error_message.contains("BrokerTransportFailure") { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaBrokerUnreachable, + e.to_report_string(), + )); + } + static TOPIC_NOT_FOUND: LazyLock = + LazyLock::new(|| Regex::new(r"topic .* not found").unwrap()); + if TOPIC_NOT_FOUND.is_match(error_message.as_str()) { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaTopicNotFound, + e.to_report_string(), + )); + } + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaOther, + e.to_report_string(), + )); + } + }); + Ok(Response::new(RwCloudValidateSourceResponse { + ok: true, + error: None, + })) + } +} diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 68a5e4c4a762..2e327dc47a59 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -19,6 +19,7 @@ use risingwave_meta::*; pub mod backup_service; +pub mod cloud_service; pub mod cluster_limit_service; pub mod cluster_service; pub mod ddl_service; From cd0c91998a601e7192691a110a807c47ccb06204 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 15:26:04 +0800 Subject: [PATCH 06/10] remove ddl controller v2 Signed-off-by: tabVersion --- src/meta/src/rpc/ddl_controller_v2.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/meta/src/rpc/ddl_controller_v2.rs diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs deleted file mode 100644 index e69de29bb2d1..000000000000 From 3fcdb3f54d67f1737ab3f6f7947b372a37b62a8f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 15:30:06 +0800 Subject: [PATCH 07/10] update proto Signed-off-by: tabVersion --- proto/catalog.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 8eeb75843244..e4d03db35f72 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -243,7 +243,7 @@ message Connection { uint32 database_id = 3; string name = 4; oneof info { - PrivateLinkService private_link_service = 5; + PrivateLinkService private_link_service = 5 [deprecated = true]; } uint32 owner = 6; } From 8f994e98c7861ec564ffca395735aa391bf8b359 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 15:51:55 +0800 Subject: [PATCH 08/10] resolve comments Signed-off-by: tabVersion --- src/frontend/src/handler/create_connection.rs | 2 +- src/meta/service/src/ddl_service.rs | 21 ++++--------------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 577b643097a9..634d5ab829db 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -48,7 +48,7 @@ fn resolve_create_connection_payload( return match connection_type.as_str() { PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( "CREATE CONNECTION to Private Link".to_string(), - "RisingWave Cloud Portal".to_string(), + "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_string(), ))), _ => Err(RwError::from(ProtocolError(format!( "Connection type \"{connection_type}\" is not supported" diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 7a5d0f315c7b..863c4ca399ff 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -222,11 +222,6 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let source = req.get_source()?.clone(); - // validate connection before starting the DDL procedure - if let Some(connection_id) = source.connection_id { - self.validate_connection(connection_id).await?; - } - match req.fragment_graph { None => { let version = self @@ -288,11 +283,6 @@ impl DdlService for DdlServiceImpl { let fragment_graph = req.get_fragment_graph()?.clone(); let affected_table_change = req.get_affected_table_change().cloned().ok(); - // validate connection before starting the DDL procedure - if let Some(connection_id) = sink.connection_id { - self.validate_connection(connection_id).await?; - } - let stream_job = match &affected_table_change { None => StreamingJob::Sink(sink, None), Some(change) => { @@ -739,9 +729,9 @@ impl DdlService for DdlServiceImpl { } return match req.payload.unwrap() { - create_connection_request::Payload::PrivateLink(_) => Err(Status::unavailable( - "Private Link is deprecated, please use Cloud Portal", - )), + create_connection_request::Payload::PrivateLink(_) => { + panic!("Private Link Connection has been deprecated") + } }; } @@ -1034,10 +1024,7 @@ impl DdlServiceImpl { .get_connection_by_id(connection_id as _) .await?; if let Some(connection::Info::PrivateLinkService(_)) = &connection.info { - return Err(MetaError::from(MetaErrorInner::Deprecated( - "CREATE CONNECTION to Private Link".to_string(), - "RisingWave Cloud Portal".to_string(), - ))); + panic!("Private Link Connection has been deprecated") } Ok(()) } From 618444df3d755770da5ef3e60b8de6c0296c1511 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 16:11:37 +0800 Subject: [PATCH 09/10] fix Signed-off-by: tabVersion --- src/meta/service/src/ddl_service.rs | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 863c4ca399ff..71b45b1887ef 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -22,12 +22,11 @@ use risingwave_common::catalog::ColumnCatalog; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; -use risingwave_meta::error::MetaErrorInner; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, CreateType, Secret, Table}; +use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -45,7 +44,7 @@ use crate::rpc::ddl_controller::{ DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, }; use crate::stream::{GlobalStreamManagerRef, SourceManagerRef}; -use crate::{MetaError, MetaResult}; +use crate::MetaError; #[derive(Clone)] pub struct DdlServiceImpl { @@ -728,7 +727,7 @@ impl DdlService for DdlServiceImpl { return Err(Status::invalid_argument("request is empty")); } - return match req.payload.unwrap() { + match req.payload.unwrap() { create_connection_request::Payload::PrivateLink(_) => { panic!("Private Link Connection has been deprecated") } @@ -1016,20 +1015,6 @@ impl DdlService for DdlServiceImpl { } } -impl DdlServiceImpl { - async fn validate_connection(&self, connection_id: u32) -> MetaResult<()> { - let connection = self - .metadata_manager - .catalog_controller - .get_connection_by_id(connection_id as _) - .await?; - if let Some(connection::Info::PrivateLinkService(_)) = &connection.info { - panic!("Private Link Connection has been deprecated") - } - Ok(()) - } -} - fn add_auto_schema_change_fail_event_log( meta_metrics: &Arc, table_id: u32, From 1fc823c431ab0ce185bca93c5f71410f1bcbf7bc Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 16:30:24 +0800 Subject: [PATCH 10/10] remove related e2e Signed-off-by: tabVersion --- e2e_test/ddl/alter_set_schema.slt | 17 ------ e2e_test/ddl/connection.slt | 23 -------- e2e_test/sink/kafka/create_sink.slt | 42 --------------- e2e_test/source_legacy/basic/ddl.slt | 52 ------------------- .../basic/old_row_format_syntax/ddl.slt | 40 -------------- src/connector/src/sink/mod.rs | 2 - 6 files changed, 176 deletions(-) delete mode 100644 e2e_test/ddl/connection.slt diff --git a/e2e_test/ddl/alter_set_schema.slt b/e2e_test/ddl/alter_set_schema.slt index 74dcc5a77e64..db0f479c85c0 100644 --- a/e2e_test/ddl/alter_set_schema.slt +++ b/e2e_test/ddl/alter_set_schema.slt @@ -94,23 +94,6 @@ WHERE nspname = 'test_schema'; ---- test_subscription test_schema -statement ok -CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock'); - -statement ok -ALTER CONNECTION test_conn SET SCHEMA test_schema; - -query TT -SELECT name AS connname, nspname AS schemaname -FROM rw_connections -JOIN pg_namespace ON pg_namespace.oid = rw_connections.schema_id -WHERE nspname = 'test_schema'; ----- -test_conn test_schema - -statement ok -DROP CONNECTION test_schema.test_conn; - statement ok DROP SINK test_schema.test_sink; diff --git a/e2e_test/ddl/connection.slt b/e2e_test/ddl/connection.slt deleted file mode 100644 index 435395e9d249..000000000000 --- a/e2e_test/ddl/connection.slt +++ /dev/null @@ -1,23 +0,0 @@ -# Create a connection. -statement ok -CREATE CONNECTION conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Create another user with duplicate name. -statement error -CREATE CONNECTION conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Create if not exists. -statement ok -CREATE CONNECTION IF NOT EXISTS conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Test quoting. -statement ok -CREATE CONNECTION "conn1" WITH (type = 'privatelink', provider = 'mock'); - -# Drop connections. -statement ok -DROP CONNECTION conn0; - -# Drop connections. -statement ok -DROP CONNECTION conn1; diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 338465c471af..7f589e4a4b23 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -31,48 +31,6 @@ create sink sink_non_exist_broker from t_kafka with ( type = 'append-only', ); -# Test create sink with connection -# Create a mock connection -statement ok -create connection mock with ( - type = 'privatelink', - provider = 'mock', -); - -# Refer to a non-existant connection -statement error -create sink si_kafka_append_only_conn from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'test-rw-sink-append-only', - type = 'append-only', - force_append_only = 'true', - connection.name = 'nonexist', -); - -# Create sink with connection -statement ok -create sink si_kafka_append_only_conn from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'test-rw-sink-append-only', - type = 'append-only', - force_append_only = 'true', - connection.name = 'mock', -); - -# Try to drop connection mock, which is in use -statement error -drop connection mock; - -# Drop sink -statement ok -drop sink si_kafka_append_only_conn; - -# Drop connection -statement ok -drop connection mock; - # Connection test clean-up finished statement error sink cannot be append-only diff --git a/e2e_test/source_legacy/basic/ddl.slt b/e2e_test/source_legacy/basic/ddl.slt index 8e63971fb5b8..e8b9e10b249f 100644 --- a/e2e_test/source_legacy/basic/ddl.slt +++ b/e2e_test/source_legacy/basic/ddl.slt @@ -198,35 +198,6 @@ s statement ok drop table s -# Test create source with connection -statement ok -CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock'); - -# Reference to non-existant connection -statement error -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', -) FORMAT PLAIN ENCODE JSON; - -statement ok -CREATE TABLE mytable ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock' -) FORMAT PLAIN ENCODE JSON; - -statement ok -DROP TABLE mytable; - - # `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns. statement error create source s ( @@ -236,7 +207,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; # `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key. @@ -248,7 +218,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; # `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type. @@ -260,25 +229,4 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; - -statement ok -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock', -) FORMAT PLAIN ENCODE JSON; - -# Drop a connection in use -statement error -drop connection mock; - -statement ok -drop source s; - -statement ok -drop connection mock; diff --git a/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt b/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt index 0fe67a8504b5..79c3553c38c7 100644 --- a/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt +++ b/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt @@ -152,35 +152,6 @@ s statement ok drop table s -# Test create source with connection -statement ok -CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock'); - -# Reference to non-existant connection -statement error -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', -) ROW FORMAT JSON; - -statement ok -CREATE TABLE mytable ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock' -) ROW FORMAT JSON; - -statement ok -DROP TABLE mytable; - - # `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns. statement error create source s ( @@ -190,7 +161,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; # `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key. @@ -202,7 +172,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; # `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type. @@ -214,7 +183,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; statement ok @@ -224,15 +192,7 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock', ) ROW FORMAT JSON; -# Drop a connection in use -statement error -drop connection mock; - statement ok drop source s; - -statement ok -drop connection mock; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2bbbb9558244..2111b33e29ef 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -693,12 +693,10 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator { impl SinkImpl { pub fn new(mut param: SinkParam) -> Result { - const CONNECTION_NAME_KEY: &str = "connection.name"; const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets"; // remove privatelink related properties if any param.properties.remove(PRIVATE_LINK_TARGET_KEY); - param.properties.remove(CONNECTION_NAME_KEY); let sink_type = param .properties