diff --git a/e2e_test/ddl/alter_set_schema.slt b/e2e_test/ddl/alter_set_schema.slt index 74dcc5a77e64a..db0f479c85c05 100644 --- a/e2e_test/ddl/alter_set_schema.slt +++ b/e2e_test/ddl/alter_set_schema.slt @@ -94,23 +94,6 @@ WHERE nspname = 'test_schema'; ---- test_subscription test_schema -statement ok -CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock'); - -statement ok -ALTER CONNECTION test_conn SET SCHEMA test_schema; - -query TT -SELECT name AS connname, nspname AS schemaname -FROM rw_connections -JOIN pg_namespace ON pg_namespace.oid = rw_connections.schema_id -WHERE nspname = 'test_schema'; ----- -test_conn test_schema - -statement ok -DROP CONNECTION test_schema.test_conn; - statement ok DROP SINK test_schema.test_sink; diff --git a/e2e_test/ddl/connection.slt b/e2e_test/ddl/connection.slt deleted file mode 100644 index 435395e9d249a..0000000000000 --- a/e2e_test/ddl/connection.slt +++ /dev/null @@ -1,23 +0,0 @@ -# Create a connection. -statement ok -CREATE CONNECTION conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Create another user with duplicate name. -statement error -CREATE CONNECTION conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Create if not exists. -statement ok -CREATE CONNECTION IF NOT EXISTS conn0 WITH (type = 'privatelink', provider = 'mock'); - -# Test quoting. -statement ok -CREATE CONNECTION "conn1" WITH (type = 'privatelink', provider = 'mock'); - -# Drop connections. -statement ok -DROP CONNECTION conn0; - -# Drop connections. -statement ok -DROP CONNECTION conn1; diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 338465c471af9..7f589e4a4b231 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -31,48 +31,6 @@ create sink sink_non_exist_broker from t_kafka with ( type = 'append-only', ); -# Test create sink with connection -# Create a mock connection -statement ok -create connection mock with ( - type = 'privatelink', - provider = 'mock', -); - -# Refer to a non-existant connection -statement error -create sink si_kafka_append_only_conn from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'test-rw-sink-append-only', - type = 'append-only', - force_append_only = 'true', - connection.name = 'nonexist', -); - -# Create sink with connection -statement ok -create sink si_kafka_append_only_conn from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'test-rw-sink-append-only', - type = 'append-only', - force_append_only = 'true', - connection.name = 'mock', -); - -# Try to drop connection mock, which is in use -statement error -drop connection mock; - -# Drop sink -statement ok -drop sink si_kafka_append_only_conn; - -# Drop connection -statement ok -drop connection mock; - # Connection test clean-up finished statement error sink cannot be append-only diff --git a/e2e_test/source_legacy/basic/ddl.slt b/e2e_test/source_legacy/basic/ddl.slt index 8e63971fb5b82..e8b9e10b249fa 100644 --- a/e2e_test/source_legacy/basic/ddl.slt +++ b/e2e_test/source_legacy/basic/ddl.slt @@ -198,35 +198,6 @@ s statement ok drop table s -# Test create source with connection -statement ok -CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock'); - -# Reference to non-existant connection -statement error -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', -) FORMAT PLAIN ENCODE JSON; - -statement ok -CREATE TABLE mytable ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock' -) FORMAT PLAIN ENCODE JSON; - -statement ok -DROP TABLE mytable; - - # `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns. statement error create source s ( @@ -236,7 +207,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; # `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key. @@ -248,7 +218,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; # `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type. @@ -260,25 +229,4 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) FORMAT DEBEZIUM_MONGO ENCODE JSON; - -statement ok -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock', -) FORMAT PLAIN ENCODE JSON; - -# Drop a connection in use -statement error -drop connection mock; - -statement ok -drop source s; - -statement ok -drop connection mock; diff --git a/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt b/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt index 0fe67a8504b5d..79c3553c38c70 100644 --- a/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt +++ b/e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt @@ -152,35 +152,6 @@ s statement ok drop table s -# Test create source with connection -statement ok -CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock'); - -# Reference to non-existant connection -statement error -create source s ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', -) ROW FORMAT JSON; - -statement ok -CREATE TABLE mytable ( - column1 varchar -) with ( - connector = 'kafka', - topic = 'kafka_1_partition_topic', - properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock' -) ROW FORMAT JSON; - -statement ok -DROP TABLE mytable; - - # `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns. statement error create source s ( @@ -190,7 +161,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; # `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key. @@ -202,7 +172,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; # `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type. @@ -214,7 +183,6 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'nonexist', ) ROW FORMAT DEBEZIUM_MONGO_JSON; statement ok @@ -224,15 +192,7 @@ create source s ( connector = 'kafka', topic = 'kafka_1_partition_topic', properties.bootstrap.server = 'message_queue:29092', - connection.name = 'mock', ) ROW FORMAT JSON; -# Drop a connection in use -statement error -drop connection mock; - statement ok drop source s; - -statement ok -drop connection mock; diff --git a/proto/catalog.proto b/proto/catalog.proto index 0c67a92f23cdd..169347c199eb9 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -243,7 +243,7 @@ message Connection { uint32 database_id = 3; string name = 4; oneof info { - PrivateLinkService private_link_service = 5; + PrivateLinkService private_link_service = 5 [deprecated = true]; } uint32 owner = 6; } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2bbbb95582447..2111b33e29ef7 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -693,12 +693,10 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator { impl SinkImpl { pub fn new(mut param: SinkParam) -> Result { - const CONNECTION_NAME_KEY: &str = "connection.name"; const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets"; // remove privatelink related properties if any param.properties.remove(PRIVATE_LINK_TARGET_KEY); - param.properties.remove(CONNECTION_NAME_KEY); let sink_type = param .properties diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 6aeebde87b516..3f6f1b8e32da7 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -30,7 +30,6 @@ use crate::error::ConnectorResult; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; -pub const CONNECTION_NAME_KEY: &str = "connection.name"; #[derive(Debug)] pub(super) enum PrivateLinkContextRole { diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 54e1210979fe8..03b2ff4203c53 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -12,18 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; -use std::sync::Arc; - -use anyhow::anyhow; -use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; -use risingwave_connector::WithPropertiesExt; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::catalog::connection::Info; use risingwave_pb::catalog::{connection, PbConnection}; use crate::catalog::{ConnectionId, OwnedByUserCatalog}; -use crate::error::{Result, RwError}; use crate::user::UserId; #[derive(Clone, Debug, PartialEq)] @@ -64,24 +56,3 @@ impl OwnedByUserCatalog for ConnectionCatalog { self.owner } } - -pub(crate) fn resolve_private_link_connection( - connection: &Arc, - properties: &mut BTreeMap, -) -> Result<()> { - #[allow(irrefutable_let_patterns)] - if let connection::Info::PrivateLinkService(svc) = &connection.info { - if !properties.is_kafka_connector() { - return Err(RwError::from(anyhow!( - "Private link is only supported for Kafka connector" - ))); - } - // skip all checks for mock connection - if svc.get_provider()? == PrivateLinkProvider::Mock { - return Ok(()); - } - insert_privatelink_broker_rewrite_map(properties, Some(svc), None) - .map_err(RwError::from)?; - } - Ok(()) -} diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 987f0e9fdd897..634d5ab829db8 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -16,23 +16,16 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::ddl_service::create_connection_request; use risingwave_sqlparser::ast::CreateConnectionStatement; use super::RwPgResponse; use crate::binder::Binder; use crate::error::ErrorCode::ProtocolError; -use crate::error::{Result, RwError}; +use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; -pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider"; -pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name"; -pub(crate) const CONNECTION_TAGS_PROP: &str = "tags"; - -pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing -pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws"; #[inline(always)] fn get_connection_property_required( @@ -48,58 +41,19 @@ fn get_connection_property_required( ))) }) } - -fn resolve_private_link_properties( - with_properties: &BTreeMap, -) -> Result { - let provider = - match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str() - { - CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock, - CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws, - provider => { - return Err(RwError::from(ProtocolError(format!( - "Unsupported privatelink provider {}", - provider - )))); - } - }; - match provider { - PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name: String::new(), - tags: None, - }), - PrivateLinkProvider::Aws => { - let service_name = - get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?; - Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name, - tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(), - }) - } - PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError( - "Privatelink provider unspecified".to_string(), - ))), - } -} - fn resolve_create_connection_payload( with_properties: &BTreeMap, ) -> Result { let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?; - let create_connection_payload = match connection_type.as_str() { - PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink( - resolve_private_link_properties(with_properties)?, - ), - _ => { - return Err(RwError::from(ProtocolError(format!( - "Connection type \"{connection_type}\" is not supported" - )))); - } + return match connection_type.as_str() { + PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_string(), + ))), + _ => Err(RwError::from(ProtocolError(format!( + "Connection type \"{connection_type}\" is not supported" + )))), }; - Ok(create_connection_payload) } pub async fn handle_create_connection( diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 37db8d9541ad8..ea9f9e98f9b71 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -22,9 +22,7 @@ use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::{ - ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId, -}; +use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; @@ -92,12 +90,7 @@ pub async fn gen_sink_plan( let mut with_options = handler_args.with_options.clone(); - let connection_id = { - let conn_id = - resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; - conn_id.map(ConnectionId) - }; - + resolve_privatelink_in_with_option(&mut with_options)?; let mut resolved_with_options = resolve_secret_ref_in_with_options(with_options, session)?; let partition_info = get_partition_compute_info(&resolved_with_options).await?; @@ -266,7 +259,7 @@ pub async fn gen_sink_plan( SchemaId::new(sink_schema_id), DatabaseId::new(sink_database_id), UserId::new(session.user_id()), - connection_id, + None, // deprecated: private link connection id dependent_relations.into_iter().collect_vec(), ); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c315abb358aa5..077c811d9afc6 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1550,8 +1550,7 @@ pub async fn bind_create_source_or_table_with_connector( // resolve privatelink connection for Kafka let mut with_properties = with_properties; - let connection_id = - resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?; + resolve_privatelink_in_with_option(&mut with_properties)?; let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; @@ -1627,7 +1626,7 @@ pub async fn bind_create_source_or_table_with_connector( watermark_descs, associated_table_id, definition, - connection_id, + connection_id: None, // deprecated: private link connection id created_at_epoch: None, initialized_at_epoch: None, version: INITIAL_SOURCE_VERSION_ID, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index e306103c02e39..9d61021dab4fe 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use risingwave_connector::source::kafka::private_link::{ - insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, + insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; pub use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::WithPropertiesExt; @@ -28,7 +28,6 @@ use risingwave_sqlparser::ast::{ }; use super::OverwriteOptions; -use crate::catalog::connection_catalog::resolve_private_link_connection; use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::SessionImpl; @@ -186,8 +185,6 @@ pub(crate) fn resolve_secret_ref_in_with_options( pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, - schema_name: &Option, - session: &SessionImpl, ) -> RwResult> { let is_kafka = with_options.is_kafka_connector(); let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY); @@ -201,28 +198,8 @@ pub(crate) fn resolve_privatelink_in_with_option( } insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint)) .map_err(RwError::from)?; - return Ok(None); } - - let connection_name = with_options - .remove(CONNECTION_NAME_KEY) - .map(|s| s.to_lowercase()); - let connection_id = match connection_name { - Some(connection_name) => { - let connection = session - .get_connection_by_name(schema_name.clone(), &connection_name) - .map_err(|_| ErrorCode::ItemNotFound(connection_name))?; - if !is_kafka { - return Err(RwError::from(ErrorCode::ProtocolError( - "Connection is only supported in kafka connector".to_string(), - ))); - } - resolve_private_link_connection(&connection, with_options.inner_mut())?; - Some(connection.id) - } - None => None, - }; - Ok(connection_id) + Ok(None) } impl TryFrom<&[SqlOption]> for WithOptions { diff --git a/src/meta/model/src/connection.rs b/src/meta/model/src/connection.rs index a6cfa4aefb58c..dce0daa462fc5 100644 --- a/src/meta/model/src/connection.rs +++ b/src/meta/model/src/connection.rs @@ -26,6 +26,8 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub connection_id: ConnectionId, pub name: String, + + // todo: Private link service has been deprecated, consider using a new field for the connection info pub info: PrivateLinkService, } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index ff91fe9aa9eed..2ea5480eba6f6 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -87,7 +87,6 @@ use crate::controller::SqlMetaStore; use crate::hummock::HummockManager; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient}; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, @@ -530,17 +529,8 @@ pub async fn start_service_as_election_leader( compactor_manager.clone(), )); - let mut aws_cli = None; - if let Some(my_vpc_id) = &env.opts.vpc_id - && let Some(security_group_id) = &env.opts.security_group_id - { - let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await; - aws_cli = Some(cli); - } - let ddl_srv = DdlServiceImpl::new( env.clone(), - aws_cli.clone(), metadata_manager.clone(), stream_manager.clone(), source_manager.clone(), @@ -584,7 +574,7 @@ pub async fn start_service_as_election_leader( let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); - let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli); + let cloud_srv = CloudServiceImpl::new(); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); @@ -702,6 +692,7 @@ pub async fn start_service_as_election_leader( .add_service(MetaMemberServiceServer::new(meta_member_srv)) .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX)) .add_service(UserServiceServer::new(user_srv)) + .add_service(CloudServiceServer::new(cloud_srv)) .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX)) .add_service(HealthServer::new(health_srv)) .add_service(BackupServiceServer::new(backup_srv)) @@ -709,7 +700,6 @@ pub async fn start_service_as_election_leader( .add_service(SessionParamServiceServer::new(session_params_srv)) .add_service(TelemetryInfoServiceServer::new(telemetry_srv)) .add_service(ServingServiceServer::new(serving_srv)) - .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) .add_service(EventLogServiceServer::new(event_log_srv)) .add_service(ClusterLimitServiceServer::new(cluster_limit_srv)); diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index e913b91826b6f..553a8189116c3 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -18,14 +18,10 @@ use std::sync::LazyLock; use async_trait::async_trait; use regex::Regex; use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model::ConnectionId; -use risingwave_pb::catalog::connection::Info::PrivateLinkService; use risingwave_pb::cloud_service::cloud_service_server::CloudService; use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; use risingwave_pb::cloud_service::{ @@ -33,20 +29,11 @@ use risingwave_pb::cloud_service::{ }; use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; - -use crate::rpc::cloud_provider::AwsEc2Client; - -pub struct CloudServiceImpl { - metadata_manager: MetadataManager, - aws_client: Option, -} +pub struct CloudServiceImpl {} impl CloudServiceImpl { - pub fn new(metadata_manager: MetadataManager, aws_client: Option) -> Self { - Self { - metadata_manager, - aws_client, - } + pub fn new() -> Self { + Self {} } } @@ -76,70 +63,7 @@ impl CloudService for CloudServiceImpl { "unexpected source type, only kafka source is supported", )); } - let mut source_cfg: BTreeMap = req.source_config.into_iter().collect(); - // if connection_id provided, check whether endpoint service is available and resolve - // broker rewrite map currently only support aws privatelink connection - if let Some(connection_id_str) = source_cfg.get("connection.id") { - let connection_id = connection_id_str.parse::().map_err(|e| { - Status::invalid_argument(format!( - "connection.id is not an integer: {}", - e.as_report() - )) - })?; - - let connection = self - .metadata_manager - .catalog_controller - .get_connection_by_id(connection_id) - .await; - - if let Err(e) = connection { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkConnectionNotFound, - e.to_report_string(), - )); - } - if let Some(PrivateLinkService(service)) = connection.unwrap().info { - if self.aws_client.is_none() { - return Ok(new_rwc_validate_fail_response( - ErrorType::AwsClientNotConfigured, - "AWS client is not configured".to_string(), - )); - } - let cli = self.aws_client.as_ref().unwrap(); - let privatelink_status = cli - .is_vpc_endpoint_ready(service.endpoint_id.as_str()) - .await; - match privatelink_status { - Err(e) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - e.to_report_string(), - )); - } - Ok(false) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - format!("Private link endpoint {} is not ready", service.endpoint_id,), - )); - } - _ => (), - }; - if let Err(e) = - insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None) - { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - e.to_report_string(), - )); - } - } else { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - format!("connection {} has no info available", connection_id), - )); - } - } + let source_cfg: BTreeMap = req.source_config.into_iter().collect(); // XXX: We can't use secret in cloud validate source. let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg); diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1578813e2ead9..71b45b1887eff 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -25,12 +25,8 @@ use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::connection::private_link_service::{ - PbPrivateLinkProvider, PrivateLinkProvider, -}; -use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret, Table}; +use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -44,12 +40,11 @@ use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::ddl_controller::{ DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, }; use crate::stream::{GlobalStreamManagerRef, SourceManagerRef}; -use crate::{MetaError, MetaResult}; +use crate::MetaError; #[derive(Clone)] pub struct DdlServiceImpl { @@ -58,7 +53,6 @@ pub struct DdlServiceImpl { metadata_manager: MetadataManager, sink_manager: SinkCoordinatorManager, ddl_controller: DdlController, - aws_client: Arc>, meta_metrics: Arc, } @@ -66,7 +60,6 @@ impl DdlServiceImpl { #[allow(clippy::too_many_arguments)] pub async fn new( env: MetaSrvEnv, - aws_client: Option, metadata_manager: MetadataManager, stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, @@ -74,22 +67,19 @@ impl DdlServiceImpl { sink_manager: SinkCoordinatorManager, meta_metrics: Arc, ) -> Self { - let aws_cli_ref = Arc::new(aws_client); let ddl_controller = DdlController::new( env.clone(), metadata_manager.clone(), stream_manager, source_manager, barrier_manager, - aws_cli_ref.clone(), ) .await; Self { env, metadata_manager, - ddl_controller, - aws_client: aws_cli_ref, sink_manager, + ddl_controller, meta_metrics, } } @@ -231,11 +221,6 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let source = req.get_source()?.clone(); - // validate connection before starting the DDL procedure - if let Some(connection_id) = source.connection_id { - self.validate_connection(connection_id).await?; - } - match req.fragment_graph { None => { let version = self @@ -297,11 +282,6 @@ impl DdlService for DdlServiceImpl { let fragment_graph = req.get_fragment_graph()?.clone(); let affected_table_change = req.get_affected_table_change().cloned().ok(); - // validate connection before starting the DDL procedure - if let Some(connection_id) = sink.connection_id { - self.validate_connection(connection_id).await?; - } - let stream_job = match &affected_table_change { None => StreamingJob::Sink(sink, None), Some(change) => { @@ -748,63 +728,10 @@ impl DdlService for DdlServiceImpl { } match req.payload.unwrap() { - create_connection_request::Payload::PrivateLink(link) => { - // currently we only support AWS - let private_link_svc = match link.get_provider()? { - PbPrivateLinkProvider::Mock => PbPrivateLinkService { - provider: link.provider, - service_name: String::new(), - endpoint_id: String::new(), - endpoint_dns_name: String::new(), - dns_entries: HashMap::new(), - }, - PbPrivateLinkProvider::Aws => { - if let Some(aws_cli) = self.aws_client.as_ref() { - let tags_env = self - .env - .opts - .privatelink_endpoint_default_tags - .as_ref() - .map(|tags| { - tags.iter() - .map(|(key, val)| (key.as_str(), val.as_str())) - .collect() - }); - aws_cli - .create_aws_private_link( - &link.service_name, - link.tags.as_deref(), - tags_env, - ) - .await? - } else { - return Err(Status::from(MetaError::unavailable( - "AWS client is not configured", - ))); - } - } - PbPrivateLinkProvider::Unspecified => { - return Err(Status::invalid_argument("Privatelink provider unspecified")); - } - }; - let connection = Connection { - id: 0, - schema_id: req.schema_id, - database_id: req.database_id, - name: req.name, - owner: req.owner_id, - info: Some(connection::Info::PrivateLinkService(private_link_svc)), - }; - - // save private link info to catalog - let version = self - .ddl_controller - .run_command(DdlCommand::CreateConnection(connection)) - .await?; - - Ok(Response::new(CreateConnectionResponse { version })) + create_connection_request::Payload::PrivateLink(_) => { + panic!("Private Link Connection has been deprecated") } - } + }; } async fn list_connections( @@ -1088,33 +1015,6 @@ impl DdlService for DdlServiceImpl { } } -impl DdlServiceImpl { - async fn validate_connection(&self, connection_id: u32) -> MetaResult<()> { - let connection = self - .metadata_manager - .catalog_controller - .get_connection_by_id(connection_id as _) - .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &connection.info { - // skip all checks for mock connection - if svc.get_provider()? == PrivateLinkProvider::Mock { - return Ok(()); - } - - // check whether private link is ready - if let Some(aws_cli) = self.aws_client.as_ref() { - if !aws_cli.is_vpc_endpoint_ready(&svc.endpoint_id).await? { - return Err(MetaError::from(anyhow!( - "Private link endpoint {} is not ready", - svc.endpoint_id - ))); - } - } - } - Ok(()) - } -} - fn add_auto_schema_change_fail_event_log( meta_metrics: &Arc, table_id: u32, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 7ce5747f7debe..2870ac4293ef5 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -32,9 +32,9 @@ use risingwave_meta_model::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, - FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SecretId, SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, - TableId, UserId, ViewId, + FunctionId, I32Array, IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkId, + SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, + ViewId, }; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; @@ -110,7 +110,8 @@ pub struct ReleaseContext { /// Dropped source list, need to unregister from source manager. pub(crate) source_ids: Vec, /// Dropped connection list, need to delete from vpc endpoints. - pub(crate) connections: Vec, + #[allow(dead_code)] + pub(crate) connections: Vec, /// Dropped fragments that are fetching data from the target source. pub(crate) source_fragments: HashMap>, @@ -368,7 +369,7 @@ impl CatalogController { .all(&txn) .await? .into_iter() - .map(|conn| conn.info) + .map(|conn| conn.connection_id) .collect_vec(); // Find affect users with privileges on the database and the objects in the database. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8bfe188d4a3fa..f1c3bb0ffdd8a 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -116,9 +116,6 @@ pub enum MetaErrorInner { SinkError, ), - #[error("AWS SDK error: {0}")] - Aws(#[source] BoxedError), - #[error(transparent)] Internal( #[from] @@ -132,6 +129,9 @@ pub enum MetaErrorInner { #[error("Integrity check failed")] IntegrityCheckFailed, + + #[error("{0} has been deprecated, please use {1} instead.")] + Deprecated(String, String), } impl MetaError { @@ -156,15 +156,6 @@ impl MetaError { } } -impl From> for MetaError -where - E: std::error::Error + Sync + Send + 'static, -{ - fn from(e: aws_sdk_ec2::error::SdkError) -> Self { - MetaErrorInner::Aws(e.into()).into() - } -} - impl From for tonic::Status { fn from(err: MetaError) -> Self { use tonic::Code; diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs deleted file mode 100644 index fce20d5eea096..0000000000000 --- a/src/meta/src/rpc/cloud_provider.rs +++ /dev/null @@ -1,337 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use anyhow::anyhow; -use aws_config::retry::RetryConfig; -use aws_sdk_ec2::error::ProvideErrorMetadata; -use aws_sdk_ec2::types::{Filter, ResourceType, State, Tag, TagSpecification, VpcEndpointType}; -use itertools::Itertools; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; - -use crate::{MetaError, MetaResult}; - -#[derive(Clone)] -pub struct AwsEc2Client { - client: aws_sdk_ec2::Client, - /// `vpc_id`: The VPC of the running RisingWave instance - vpc_id: String, - security_group_id: String, -} - -impl AwsEc2Client { - pub async fn new(vpc_id: &str, security_group_id: &str) -> Self { - let sdk_config = aws_config::from_env() - .retry_config(RetryConfig::standard().with_max_attempts(4)) - .load() - .await; - let client = aws_sdk_ec2::Client::new(&sdk_config); - - Self { - client, - vpc_id: vpc_id.to_string(), - security_group_id: security_group_id.to_string(), - } - } - - pub async fn delete_vpc_endpoint(&self, vpc_endpoint_id: &str) -> MetaResult<()> { - let output = self - .client - .delete_vpc_endpoints() - .vpc_endpoint_ids(vpc_endpoint_id) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - if !output.unsuccessful().is_empty() { - return Err(MetaError::from(anyhow!( - "Failed to delete VPC endpoint {}, error: {:?}", - vpc_endpoint_id, - output.unsuccessful() - ))); - } - Ok(()) - } - - /// `service_name`: The name of the endpoint service we want to access - /// `tags_user_str`: The tags specified in with clause of `create connection` - /// `tags_env`: The default tags specified in env var `RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS` - pub async fn create_aws_private_link( - &self, - service_name: &str, - tags_user_str: Option<&str>, - tags_env: Option>, - ) -> MetaResult { - // fetch the AZs of the endpoint service - let service_azs = self.get_endpoint_service_az_names(service_name).await?; - let subnet_and_azs = self.describe_subnets(&self.vpc_id, &service_azs).await?; - - let subnet_ids: Vec = subnet_and_azs.iter().map(|(id, _, _)| id.clone()).collect(); - let az_to_azid_map: HashMap = subnet_and_azs - .into_iter() - .map(|(_, az, az_id)| (az, az_id)) - .collect(); - - let tags_vec = match tags_user_str { - Some(tags_user_str) => { - let mut tags_user = tags_user_str - .split(',') - .map(|s| { - s.split_once('=').ok_or_else(|| { - MetaError::invalid_parameter("Failed to parse `tags` parameter") - }) - }) - .collect::>>()?; - match tags_env { - Some(tags_env) => { - tags_user.extend(tags_env); - Some(tags_user) - } - None => Some(tags_user), - } - } - None => tags_env, - }; - - let (endpoint_id, endpoint_dns_names) = self - .create_vpc_endpoint( - &self.vpc_id, - service_name, - &self.security_group_id, - &subnet_ids, - tags_vec, - ) - .await?; - - // The number of returned DNS names may not equal to the input AZs, - // because some AZs may not have a subnet in the RW VPC - let mut azid_to_dns_map = HashMap::new(); - if endpoint_dns_names.first().is_none() { - return Err(MetaError::from(anyhow!( - "No DNS name returned for the endpoint" - ))); - } - - // The first dns name doesn't has AZ info - let endpoint_dns_name = endpoint_dns_names.first().unwrap().clone(); - for dns_name in &endpoint_dns_names { - for az in az_to_azid_map.keys() { - if dns_name.contains(az) { - azid_to_dns_map - .insert(az_to_azid_map.get(az).unwrap().clone(), dns_name.clone()); - break; - } - } - } - - Ok(PrivateLinkService { - provider: PrivateLinkProvider::Aws.into(), - service_name: service_name.to_string(), - endpoint_id, - dns_entries: azid_to_dns_map, - endpoint_dns_name, - }) - } - - pub async fn is_vpc_endpoint_ready(&self, vpc_endpoint_id: &str) -> MetaResult { - let mut is_ready = false; - let filter = Filter::builder() - .name("vpc-endpoint-id") - .values(vpc_endpoint_id) - .build(); - let output = self - .client - .describe_vpc_endpoints() - .set_filters(Some(vec![filter])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.vpc_endpoints { - Some(endpoints) => { - let endpoint = endpoints - .into_iter() - .exactly_one() - .map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?; - if let Some(state) = endpoint.state { - match state { - State::Available => { - is_ready = true; - } - // forward-compatible with protocol change - other => { - is_ready = other.as_str().eq_ignore_ascii_case("available"); - } - } - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint found with the ID {}", - vpc_endpoint_id - ))); - } - } - Ok(is_ready) - } - - async fn get_endpoint_service_az_names(&self, service_name: &str) -> MetaResult> { - let mut service_azs = Vec::new(); - let output = self - .client - .describe_vpc_endpoint_services() - .set_service_names(Some(vec![service_name.to_string()])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.service_details { - Some(details) => { - let detail = details.into_iter().exactly_one().map_err(|_| { - anyhow!("More than one VPC endpoint service found with the same name") - })?; - if let Some(azs) = detail.availability_zones { - service_azs.extend(azs.into_iter()); - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint service found with the name {}", - service_name - ))); - } - } - Ok(service_azs) - } - - async fn describe_subnets( - &self, - vpc_id: &str, - az_names: &[String], - ) -> MetaResult> { - let vpc_filter = Filter::builder().name("vpc-id").values(vpc_id).build(); - let az_filter = Filter::builder() - .name("availability-zone") - .set_values(Some(Vec::from(az_names))) - .build(); - let output = self - .client - .describe_subnets() - .set_filters(Some(vec![vpc_filter, az_filter])) - .send() - .await - .map_err(|e| { - anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id")) - })?; - - let subnets = output - .subnets - .unwrap_or_default() - .into_iter() - .unique_by(|s| s.availability_zone().unwrap_or_default().to_string()) - .map(|s| { - ( - s.subnet_id.unwrap_or_default(), - s.availability_zone.unwrap_or_default(), - s.availability_zone_id.unwrap_or_default(), - ) - }) - .collect(); - Ok(subnets) - } - - async fn create_vpc_endpoint( - &self, - vpc_id: &str, - service_name: &str, - security_group_id: &str, - subnet_ids: &[String], - tags_vec: Option>, - ) -> MetaResult<(String, Vec)> { - let tag_spec = match tags_vec { - Some(tags_vec) => { - let tags = tags_vec - .into_iter() - .map(|(tag_key, tag_val)| { - Tag::builder() - .set_key(Some(tag_key.to_string())) - .set_value(Some(tag_val.to_string())) - .build() - }) - .collect(); - Some(vec![TagSpecification::builder() - .set_resource_type(Some(ResourceType::VpcEndpoint)) - .set_tags(Some(tags)) - .build()]) - } - None => None, - }; - - let output = self - .client - .create_vpc_endpoint() - .vpc_endpoint_type(VpcEndpointType::Interface) - .vpc_id(vpc_id) - .security_group_ids(security_group_id) - .service_name(service_name) - .set_subnet_ids(Some(subnet_ids.to_owned())) - .set_tag_specifications(tag_spec) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to create vpc endpoint: vpc_id {vpc_id}, \ - service_name {service_name}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - let endpoint = output.vpc_endpoint().unwrap(); - let mut dns_names = Vec::new(); - - endpoint.dns_entries().iter().for_each(|e| { - if let Some(dns_name) = e.dns_name() { - dns_names.push(dns_name.to_string()); - } - }); - - Ok(( - endpoint.vpc_endpoint_id().unwrap_or_default().to_string(), - dns_names, - )) - } -} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c2629cc3718f6..761206d1c85e5 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -43,13 +43,11 @@ use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; -use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - connection, Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, - Schema, Secret, Sink, Source, Subscription, Table, View, + Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, + Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -67,7 +65,6 @@ use risingwave_pb::stream_plan::{ use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; @@ -79,7 +76,6 @@ use crate::manager::{ IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -188,7 +184,6 @@ pub struct DdlController { pub(crate) source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, // The semaphore is used to limit the number of concurrent streaming job creation. pub(crate) creating_streaming_job_permits: Arc, } @@ -258,7 +253,6 @@ impl DdlController { stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, ) -> Self { let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await); Self { @@ -267,7 +261,6 @@ impl DdlController { stream_manager, source_manager, barrier_manager, - aws_client, creating_streaming_job_permits, } } @@ -546,21 +539,6 @@ impl DdlController { .await } - pub(crate) async fn delete_vpc_endpoint(&self, svc: &PrivateLinkService) -> MetaResult<()> { - // delete AWS vpc endpoint - if svc.get_provider()? == PbPrivateLinkProvider::Aws { - if let Some(aws_cli) = self.aws_client.as_ref() { - aws_cli.delete_vpc_endpoint(&svc.endpoint_id).await?; - } else { - warn!( - "AWS client is not initialized, skip deleting vpc endpoint {}", - svc.endpoint_id - ); - } - } - Ok(()) - } - async fn create_subscription( &self, mut subscription: Subscription, @@ -1169,14 +1147,11 @@ impl DdlController { .await; } ObjectType::Connection => { - let (version, conn) = self + let (version, _conn) = self .metadata_manager .catalog_controller .drop_connection(object_id) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { - self.delete_vpc_endpoint(svc).await?; - } return Ok(version); } _ => { @@ -1297,22 +1272,12 @@ impl DdlController { streaming_job_ids, state_table_ids, source_ids, - connections, source_fragments, removed_actors, removed_fragments, + .. } = release_ctx; - // delete vpc endpoints. - for conn in connections { - let _ = self - .delete_vpc_endpoint(&conn.to_protobuf()) - .await - .inspect_err(|err| { - tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); - }); - } - // unregister sources. self.source_manager .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 8b256d1b2145e..9f840ded5aa47 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cloud_provider; pub mod ddl_controller; pub mod election; pub mod intercept;