diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index a61819368c685..85e3b7a8b8d56 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -90,8 +90,6 @@ restart_cn() { 127.0.0.1:5688 \ --async-stack-trace \ verbose \ - --connector-rpc-endpoint \ - 127.0.0.1:50051 \ --parallelism \ 4 \ --total-memory-bytes \ diff --git a/ci/scripts/standalone-utils.sh b/ci/scripts/standalone-utils.sh index 630d99f9bb9bd..2ad5a349252c3 100755 --- a/ci/scripts/standalone-utils.sh +++ b/ci/scripts/standalone-utils.sh @@ -16,7 +16,6 @@ start_standalone_without_compactor() { --advertise-addr 127.0.0.1:5690 \ --dashboard-host 127.0.0.1:5691 \ --prometheus-host 127.0.0.1:1250 \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --backend etcd \ --etcd-endpoints 127.0.0.1:2388 \ --state-store hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 \ @@ -26,7 +25,6 @@ start_standalone_without_compactor() { --prometheus-listener-addr 127.0.0.1:1222 \ --advertise-addr 127.0.0.1:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --parallelism 4 \ --total-memory-bytes 8589934592 \ --role both \ @@ -50,7 +48,6 @@ start_standalone() { --advertise-addr 127.0.0.1:5690 \ --dashboard-host 127.0.0.1:5691 \ --prometheus-host 127.0.0.1:1250 \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --backend etcd \ --etcd-endpoints 127.0.0.1:2388 \ --state-store hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 \ @@ -60,7 +57,6 @@ start_standalone() { --prometheus-listener-addr 127.0.0.1:1222 \ --advertise-addr 127.0.0.1:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --parallelism 4 \ --total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose-with-azblob.yml b/docker/docker-compose-with-azblob.yml index c26634a4972f3..3f7103ef649b7 100644 --- a/docker/docker-compose-with-azblob.yml +++ b/docker/docker-compose-with-azblob.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+azblob:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose-with-gcs.yml b/docker/docker-compose-with-gcs.yml index 5b5dde469e2ab..0a393d829ad21 100644 --- a/docker/docker-compose-with-gcs.yml +++ b/docker/docker-compose-with-gcs.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+gcs:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose-with-local-fs.yml b/docker/docker-compose-with-local-fs.yml index 8ffc0992747a3..b45e624c619b3 100644 --- a/docker/docker-compose-with-local-fs.yml +++ b/docker/docker-compose-with-local-fs.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+fs:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ # --parallelism 4 \ --role both \ --meta-address http://0.0.0.0:5690\" \ diff --git a/docker/docker-compose-with-obs.yml b/docker/docker-compose-with-obs.yml index aaec080276df6..f18478cb2a563 100644 --- a/docker/docker-compose-with-obs.yml +++ b/docker/docker-compose-with-obs.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+obs:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose-with-oss.yml b/docker/docker-compose-with-oss.yml index 71f9aa4366d5d..b8d4c41142eec 100644 --- a/docker/docker-compose-with-oss.yml +++ b/docker/docker-compose-with-oss.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+oss:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose-with-s3.yml b/docker/docker-compose-with-s3.yml index a36c2d48f4d05..3e3bbd95b684a 100644 --- a/docker/docker-compose-with-s3.yml +++ b/docker/docker-compose-with-s3.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+s3:// \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d7e4f47a26013..2f714d6b5f8e9 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -11,7 +11,6 @@ services: --dashboard-host 0.0.0.0:5691 \ --prometheus-host 0.0.0.0:1250 \ --prometheus-endpoint http://prometheus-0:9500 \ - --connector-rpc-endpoint 0.0.0.0:50051 \ --backend etcd \ --etcd-endpoints etcd-0:2388 \ --state-store hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001 \ @@ -23,7 +22,6 @@ services: --prometheus-listener-addr 0.0.0.0:1250 \ --advertise-addr 0.0.0.0:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 0.0.0.0:50051 \ #--parallelism 4 \ #--total-memory-bytes 8589934592 \ --role both \ diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 126f356ce7c27..9a348b25a4025 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -173,7 +173,6 @@ impl SourceExecutor { u32::MAX, self.metrics, self.source_ctrl_opts.clone(), - None, ConnectorProperties::default(), "NA".to_owned(), // source name was not passed in batch plan )); diff --git a/src/cmd_all/scripts/standalone-demo-dev.sh b/src/cmd_all/scripts/standalone-demo-dev.sh index 87c1e0ab9b61b..f43ac94c69fec 100755 --- a/src/cmd_all/scripts/standalone-demo-dev.sh +++ b/src/cmd_all/scripts/standalone-demo-dev.sh @@ -12,7 +12,6 @@ cargo run -p risingwave_cmd_all \ --advertise-addr 127.0.0.1:5690 \ --dashboard-host 127.0.0.1:5691 \ --prometheus-host 127.0.0.1:1250 \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --backend mem \ --state-store hummock+memory \ --data-directory hummock_001 \ @@ -23,7 +22,6 @@ cargo run -p risingwave_cmd_all \ --prometheus-listener-addr 127.0.0.1:1222 \ --advertise-addr 127.0.0.1:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --parallelism 4 \ --total-memory-bytes 8589934592 \ --role both \ diff --git a/src/cmd_all/scripts/standalone-demo-full.sh b/src/cmd_all/scripts/standalone-demo-full.sh index 28135e7453669..e555dd9ced82d 100755 --- a/src/cmd_all/scripts/standalone-demo-full.sh +++ b/src/cmd_all/scripts/standalone-demo-full.sh @@ -17,7 +17,6 @@ start_standalone() { --advertise-addr 127.0.0.1:5690 \ --dashboard-host 127.0.0.1:5691 \ --prometheus-host 127.0.0.1:1250 \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --backend etcd \ --etcd-endpoints 127.0.0.1:2388 \ --state-store hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 \ @@ -29,7 +28,6 @@ start_standalone() { --prometheus-listener-addr 127.0.0.1:1222 \ --advertise-addr 127.0.0.1:5688 \ --async-stack-trace verbose \ - --connector-rpc-endpoint 127.0.0.1:50051 \ --parallelism 4 \ --total-memory-bytes 8589934592 \ --role both \ diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 384648f9211de..eb5e1a67e9889 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -294,7 +294,6 @@ mod test { sql_endpoint: None, prometheus_endpoint: None, prometheus_selector: None, - connector_rpc_endpoint: None, privatelink_endpoint_default_tags: None, vpc_id: None, security_group_id: None, @@ -313,6 +312,7 @@ mod test { backup_storage_directory: None, heap_profiling_dir: None, dangerous_max_idle_secs: None, + connector_rpc_endpoint: None, }, ), compute_opts: Some( @@ -325,7 +325,6 @@ mod test { http://127.0.0.1:5690/, ], ), - connector_rpc_endpoint: None, connector_rpc_sink_payload_format: None, config_path: "src/config/test.toml", total_memory_bytes: 34359738368, @@ -336,13 +335,13 @@ mod test { meta_file_cache_dir: None, async_stack_trace: None, heap_profiling_dir: None, + connector_rpc_endpoint: None, }, ), frontend_opts: Some( FrontendOpts { listen_addr: "127.0.0.1:4566", advertise_addr: None, - port: None, meta_addr: List( [ http://127.0.0.1:5690/, diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 870f91b33e2a2..c3d82dddfde94 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -76,10 +76,6 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] pub meta_address: MetaAddressStrategy, - /// Endpoint of the connector node - #[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")] - pub connector_rpc_endpoint: Option, - /// Payload format of connector sink rpc #[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")] pub connector_rpc_sink_payload_format: Option, @@ -131,6 +127,11 @@ pub struct ComputeNodeOpts { #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")] #[override_opts(path = server.heap_profiling.dir)] pub heap_profiling_dir: Option, + + /// Endpoint of the connector node. + #[deprecated = "connector node has been deprecated."] + #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")] + pub connector_rpc_endpoint: Option, } impl risingwave_common::opts::Opts for ComputeNodeOpts { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c4ee1021a93c9..f9cd24162f561 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -49,7 +49,7 @@ use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer use risingwave_pb::stream_service::stream_service_server::StreamServiceServer; use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer; use risingwave_pb::task_service::task_service_server::TaskServiceServer; -use risingwave_rpc_client::{ComputeClientPool, ConnectorClient, ExtraInfoSourceRef, MetaClient}; +use risingwave_rpc_client::{ComputeClientPool, ExtraInfoSourceRef, MetaClient}; use risingwave_storage::hummock::compactor::{ new_compaction_await_tree_reg_ref, start_compactor, CompactionExecutor, CompactorContext, }; @@ -334,14 +334,11 @@ pub async fn compute_node_serve( ); info!( - "connector param: {:?} {:?}", - opts.connector_rpc_endpoint, opts.connector_rpc_sink_payload_format + "connector param: payload_format={:?}", + opts.connector_rpc_sink_payload_format ); - let connector_client = ConnectorClient::try_new(opts.connector_rpc_endpoint.as_ref()).await; - let connector_params = risingwave_connector::ConnectorParams { - connector_client, sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() { None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk, Some("json") => SinkPayloadFormat::Json, diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 210559c64ff15..e25027b61573a 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -41,7 +41,6 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder; use risingwave_connector::source::SourceCtrlOpts; -use risingwave_connector::ConnectorParams; use risingwave_dml::dml_manager::DmlManager; use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_pb::catalog::StreamSourceInfo; @@ -175,7 +174,6 @@ async fn test_table_materialize() -> StreamResult<()> { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), - ConnectorParams::default(), ) .boxed(), ); diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index fb38f2db00c4f..61dc43e4462b8 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -40,7 +40,6 @@ use std::time::Duration; use duration_str::parse_std; use risingwave_pb::connector_service::SinkPayloadFormat; -use risingwave_rpc_client::ConnectorClient; use serde::de; pub mod aws_utils; @@ -65,17 +64,12 @@ mod with_options_test; #[derive(Clone, Debug, Default)] pub struct ConnectorParams { - pub connector_client: Option, pub sink_payload_format: SinkPayloadFormat, } impl ConnectorParams { - pub fn new( - connector_client: Option, - sink_payload_format: SinkPayloadFormat, - ) -> Self { + pub fn new(sink_payload_format: SinkPayloadFormat) -> Self { Self { - connector_client, sink_payload_format, } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 52724b1707660..ba2915ca47a8d 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -30,7 +30,6 @@ use risingwave_common::types::{JsonbVal, Scalar}; use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; -use risingwave_rpc_client::ConnectorClient; use serde::de::DeserializeOwned; use super::cdc::DebeziumCdcMeta; @@ -150,7 +149,6 @@ impl Default for SourceCtrlOpts { pub struct SourceEnumeratorContext { pub info: SourceEnumeratorInfo, pub metrics: Arc, - pub connector_client: Option, } #[derive(Clone, Debug, Default)] @@ -160,7 +158,6 @@ pub struct SourceEnumeratorInfo { #[derive(Debug, Default)] pub struct SourceContext { - pub connector_client: Option, pub actor_id: u32, pub source_id: TableId, // There should be a 1-1 mapping between `source_id` & `fragment_id` @@ -178,12 +175,10 @@ impl SourceContext { fragment_id: u32, metrics: Arc, source_ctrl_opts: SourceCtrlOpts, - connector_client: Option, connector_props: ConnectorProperties, source_name: String, ) -> Self { Self { - connector_client, actor_id, source_id, fragment_id, @@ -673,7 +668,6 @@ mod tests { #[test] fn test_extract_cdc_properties() { let user_props_mysql: HashMap = convert_args!(hashmap!( - "connector_node_addr" => "localhost", "connector" => "mysql-cdc", "database.hostname" => "127.0.0.1", "database.port" => "3306", @@ -684,7 +678,6 @@ mod tests { )); let user_props_postgres: HashMap = convert_args!(hashmap!( - "connector_node_addr" => "localhost", "connector" => "postgres-cdc", "database.hostname" => "127.0.0.1", "database.port" => "5432", @@ -697,10 +690,6 @@ mod tests { let conn_props = ConnectorProperties::extract(user_props_mysql, true).unwrap(); if let ConnectorProperties::MysqlCdc(c) = conn_props { - assert_eq!( - c.properties.get("connector_node_addr").unwrap(), - "localhost" - ); assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1"); assert_eq!(c.properties.get("database.port").unwrap(), "3306"); assert_eq!(c.properties.get("database.user").unwrap(), "root"); @@ -713,10 +702,6 @@ mod tests { let conn_props = ConnectorProperties::extract(user_props_postgres, true).unwrap(); if let ConnectorProperties::PostgresCdc(c) = conn_props { - assert_eq!( - c.properties.get("connector_node_addr").unwrap(), - "localhost" - ); assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1"); assert_eq!(c.properties.get("database.port").unwrap(), "5432"); assert_eq!(c.properties.get("database.user").unwrap(), "root"); diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index 46107c2d73d0a..05fe87638c884 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -224,15 +224,8 @@ impl SourceDescBuilder { let columns = self.column_catalogs_to_source_column_descs(); - let source = FsSourceReader::new( - self.with_properties.clone(), - columns.clone(), - self.connector_params - .connector_client - .as_ref() - .map(|client| client.endpoint().clone()), - parser_config, - )?; + let source = + FsSourceReader::new(self.with_properties.clone(), columns.clone(), parser_config)?; Ok(FsSourceDesc { source, diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index 93a0bd2c2d6a8..5ec47cea98cee 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -43,13 +43,10 @@ impl FsSourceReader { pub fn new( properties: HashMap, columns: Vec, - connector_node_addr: Option, parser_config: SpecificParserConfig, ) -> ConnectorResult { // Store the connector node address to properties for later use. - let mut source_props: HashMap = HashMap::from_iter(properties.clone()); - connector_node_addr - .map(|addr| source_props.insert("connector_node_addr".to_string(), addr)); + let source_props: HashMap = HashMap::from_iter(properties.clone()); let config = ConnectorProperties::extract(source_props, false)?; Ok(Self { diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 99789bcae21f2..b79aaee3cb4ab 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -99,11 +99,6 @@ pub struct FrontendOpts { #[clap(long, env = "RW_ADVERTISE_ADDR")] pub advertise_addr: Option, - // TODO(eric): Remove me - // TODO: This is currently unused. - #[clap(long, env = "RW_PORT")] - pub port: Option, - /// The address via which we will attempt to connect to a leader meta node. #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] pub meta_addr: MetaAddressStrategy, diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 7b87a46f67d50..a3fde993a1f6a 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -89,12 +89,6 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_PROMETHEUS_SELECTOR")] pub prometheus_selector: Option, - // TODO(eric): remove me - /// Endpoint of the connector node, there will be a sidecar connector node - /// colocated with Meta node in the cloud environment - #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")] - pub connector_rpc_endpoint: Option, - /// Default tag for the endpoint created when creating a privatelink connection. /// Will be appended to the tags specified in the `tags` field in with clause in `create /// connection`. @@ -171,6 +165,11 @@ pub struct MetaNodeOpts { #[clap(long, hide = true, env = "RW_DANGEROUS_MAX_IDLE_SECS")] #[override_opts(path = meta.dangerous_max_idle_secs)] pub dangerous_max_idle_secs: Option, + + /// Endpoint of the connector node. + #[deprecated = "connector node has been deprecated."] + #[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")] + pub connector_rpc_endpoint: Option, } impl risingwave_common::opts::Opts for MetaNodeOpts { @@ -319,7 +318,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { prometheus_selector: opts.prometheus_selector, vpc_id: opts.vpc_id, security_group_id: opts.security_group_id, - connector_rpc_endpoint: opts.connector_rpc_endpoint, privatelink_endpoint_default_tags, periodic_space_reclaim_compaction_interval_sec: config .meta diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index fd822c52bb8f9..ed2d5a9d46664 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -515,7 +515,6 @@ pub async fn start_service_as_election_leader( let source_manager = Arc::new( SourceManager::new( - env.clone(), barrier_scheduler.clone(), metadata_manager.clone(), meta_metrics.clone(), diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 1eb2188d6bf60..8f8adf59947ed 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -19,7 +19,7 @@ use risingwave_common::config::{CompactionConfig, DefaultParallelism}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; -use risingwave_rpc_client::{ConnectorClient, StreamClientPool, StreamClientPoolRef}; +use risingwave_rpc_client::{StreamClientPool, StreamClientPoolRef}; use sea_orm::EntityTrait; use super::{SystemParamsManager, SystemParamsManagerRef}; @@ -76,9 +76,6 @@ pub struct MetaSrvEnv { /// Unique identifier of the cluster. cluster_id: ClusterId, - /// Client to connector node. `None` if endpoint unspecified or unable to connect. - connector_client: Option, - pub hummock_seq: Option>, /// options read by all services @@ -149,10 +146,6 @@ pub struct MetaOpts { /// A usable security group id to assign to a vpc endpoint pub security_group_id: Option, - /// Endpoint of the connector node, there will be a sidecar connector node - /// colocated with Meta node in the cloud environment - pub connector_rpc_endpoint: Option, - /// Default tag for the endpoint created when creating a privatelink connection. /// Will be appended to the tags specified in the `tags` field in with clause in `create /// connection`. @@ -249,7 +242,6 @@ impl MetaOpts { prometheus_selector: None, vpc_id: None, security_group_id: None, - connector_rpc_endpoint: None, privatelink_endpoint_default_tags: None, periodic_space_reclaim_compaction_interval_sec: 60, telemetry_enabled: false, @@ -339,7 +331,6 @@ impl MetaSrvEnv { None => None, }; - let connector_client = ConnectorClient::try_new(opts.connector_rpc_endpoint.as_ref()).await; let event_log_manager = Arc::new(start_event_log_manager( opts.event_log_enabled, opts.event_log_channel_max_size, @@ -365,7 +356,6 @@ impl MetaSrvEnv { system_params_manager, system_params_controller, cluster_id: cluster_id.unwrap(), - connector_client, opts: opts.into(), hummock_seq, }) @@ -450,10 +440,6 @@ impl MetaSrvEnv { &self.cluster_id } - pub fn connector_client(&self) -> Option { - self.connector_client.clone() - } - pub fn event_log_manager_ref(&self) -> EventLogMangerRef { self.event_log_manager.clone() } @@ -530,7 +516,6 @@ impl MetaSrvEnv { system_params_manager: Some(system_params_manager), system_params_controller, cluster_id, - connector_client: None, opts, hummock_seq, } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index b00b9dacda3cd..ec142f79a7152 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -31,7 +31,6 @@ use risingwave_connector::source::{ }; use risingwave_pb::catalog::Source; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; -use risingwave_rpc_client::ConnectorClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{oneshot, Mutex}; @@ -40,7 +39,7 @@ use tokio::time::MissedTickBehavior; use tokio::{select, time}; use crate::barrier::{BarrierScheduler, Command}; -use crate::manager::{MetaSrvEnv, MetadataManager, SourceId}; +use crate::manager::{MetadataManager, SourceId}; use crate::model::{ActorId, FragmentId, TableFragments}; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -53,7 +52,6 @@ pub type ThrottleConfig = HashMap>>; /// and sends a split assignment command if split changes detected ([`Self::tick`]). pub struct SourceManager { pub paused: Mutex<()>, - env: MetaSrvEnv, barrier_scheduler: BarrierScheduler, core: Mutex, metrics: Arc, @@ -77,7 +75,6 @@ struct ConnectorSourceWorker { period: Duration, metrics: Arc, connector_properties: P, - connector_client: Option, fail_cnt: u32, source_is_up: LabelGuardedIntGauge<2>, } @@ -105,7 +102,6 @@ impl ConnectorSourceWorker

{ info: SourceEnumeratorInfo { source_id: self.source_id, }, - connector_client: self.connector_client.clone(), }), ) .await @@ -119,7 +115,6 @@ impl ConnectorSourceWorker

{ /// On creation, connection to the external source service will be established, but `splits` /// will not be updated until `tick` is called. pub async fn create( - connector_client: &Option, source: &Source, connector_properties: P, period: Duration, @@ -133,7 +128,6 @@ impl ConnectorSourceWorker

{ info: SourceEnumeratorInfo { source_id: source.id, }, - connector_client: connector_client.clone(), }), ) .await @@ -151,7 +145,6 @@ impl ConnectorSourceWorker

{ period, metrics, connector_properties, - connector_client: connector_client.clone(), fail_cnt: 0, source_is_up, }) @@ -551,7 +544,6 @@ impl SourceManager { const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10); pub async fn new( - env: MetaSrvEnv, barrier_scheduler: BarrierScheduler, metadata_manager: MetadataManager, metrics: Arc, @@ -560,12 +552,7 @@ impl SourceManager { { let sources = metadata_manager.list_sources().await?; for source in sources { - Self::create_source_worker_async( - env.connector_client(), - source, - &mut managed_sources, - metrics.clone(), - )? + Self::create_source_worker_async(source, &mut managed_sources, metrics.clone())? } } @@ -626,7 +613,6 @@ impl SourceManager { )); Ok(Self { - env, barrier_scheduler, core, paused: Mutex::new(()), @@ -784,14 +770,9 @@ impl SourceManager { if core.managed_sources.contains_key(&source.get_id()) { tracing::warn!("source {} already registered", source.get_id()); } else { - Self::create_source_worker( - self.env.connector_client(), - source, - &mut core.managed_sources, - self.metrics.clone(), - ) - .await - .context("failed to create source worker")?; + Self::create_source_worker(source, &mut core.managed_sources, self.metrics.clone()) + .await + .context("failed to create source worker")?; } Ok(()) } @@ -808,7 +789,6 @@ impl SourceManager { /// Used on startup ([`Self::new`]). Failed sources will not block meta startup. fn create_source_worker_async( - connector_client: Option, source: Source, managed_sources: &mut HashMap, metrics: Arc, @@ -831,7 +811,6 @@ impl SourceManager { ticker.tick().await; match ConnectorSourceWorker::create( - &connector_client, &source, prop.deref().clone(), DEFAULT_SOURCE_WORKER_TICK_INTERVAL, @@ -869,7 +848,6 @@ impl SourceManager { /// /// It will call `ConnectorSourceWorker::tick()` to fetch split metadata once before returning. async fn create_source_worker( - connector_client: Option, source: &Source, managed_sources: &mut HashMap, metrics: Arc, @@ -885,7 +863,6 @@ impl SourceManager { let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { let mut worker = ConnectorSourceWorker::create( - &connector_client, source, *prop, DEFAULT_SOURCE_WORKER_TICK_INTERVAL, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a5efe5ef2fd8f..dc72fcd5119a2 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -973,7 +973,6 @@ mod tests { let source_manager = Arc::new( SourceManager::new( - env.clone(), barrier_scheduler.clone(), metadata_manager.clone(), meta_metrics.clone(), diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index ce571c47f5403..bb337bb43155e 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -34,7 +34,6 @@ use risingwave_connector::source::reader::desc::SourceDesc; use risingwave_connector::source::{ BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, }; -use risingwave_connector::ConnectorParams; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; use thiserror_ext::AsReport; @@ -63,27 +62,21 @@ pub struct FsFetchExecutor { // control options for connector level source_ctrl_opts: SourceCtrlOpts, - // config for the connector node - connector_params: ConnectorParams, - _marker: PhantomData, } impl FsFetchExecutor { - #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, stream_source_core: StreamSourceCore, upstream: Executor, source_ctrl_opts: SourceCtrlOpts, - connector_params: ConnectorParams, ) -> Self { Self { actor_ctx, stream_source_core: Some(stream_source_core), upstream: Some(upstream), source_ctrl_opts, - connector_params, _marker: PhantomData, } } @@ -178,7 +171,6 @@ impl FsFetchExecutor { self.actor_ctx.fragment_id, source_desc.metrics.clone(), self.source_ctrl_opts.clone(), - self.connector_params.connector_client.clone(), source_desc.source.config.clone(), source_name.to_owned(), ) diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 2aaeadac94731..d073863b50d44 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -101,7 +101,6 @@ impl FsSourceExecutor { self.actor_ctx.fragment_id, source_desc.metrics.clone(), self.source_ctrl_opts.clone(), - None, source_desc.source.config.clone(), self.stream_source_core.source_name.clone(), ); diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 7996848a749e3..d0ff13f70ee4d 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -23,7 +23,6 @@ use risingwave_common::array::Op; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::SourceCtrlOpts; -use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedReceiver; @@ -53,9 +52,6 @@ pub struct FsListExecutor { // control options for connector level source_ctrl_opts: SourceCtrlOpts, - - // config for the connector node - connector_params: ConnectorParams, } impl FsListExecutor { @@ -67,7 +63,6 @@ impl FsListExecutor { barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, source_ctrl_opts: SourceCtrlOpts, - connector_params: ConnectorParams, ) -> Self { Self { actor_ctx, @@ -76,7 +71,6 @@ impl FsListExecutor { barrier_receiver: Some(barrier_receiver), system_params, source_ctrl_opts, - connector_params, } } diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 0b7a530f61e98..b017021a01e32 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -33,7 +33,6 @@ use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; -use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; use serde::{Deserialize, Serialize}; use source_backfill_executor::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; @@ -138,9 +137,6 @@ pub struct SourceBackfillExecutorInner { // control options for connector level source_ctrl_opts: SourceCtrlOpts, - - // config for the connector node - connector_params: ConnectorParams, } /// Local variables used in the backfill stage. @@ -172,7 +168,6 @@ impl BackfillStage { } impl SourceBackfillExecutorInner { - #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, info: ExecutorInfo, @@ -180,7 +175,6 @@ impl SourceBackfillExecutorInner { metrics: Arc, system_params: SystemParamsReaderRef, source_ctrl_opts: SourceCtrlOpts, - connector_params: ConnectorParams, backfill_state_store: BackfillStateTableHandler, ) -> Self { let source_split_change_count = metrics.source_split_change_count.with_label_values(&[ @@ -198,7 +192,6 @@ impl SourceBackfillExecutorInner { source_split_change_count, system_params, source_ctrl_opts, - connector_params, } } @@ -218,7 +211,6 @@ impl SourceBackfillExecutorInner { self.actor_ctx.fragment_id, source_desc.metrics.clone(), self.source_ctrl_opts.clone(), - self.connector_params.connector_client.clone(), source_desc.source.config.clone(), self.stream_source_core.source_name.clone(), ); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index b48e2c0a13503..7fb514faac9c9 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -26,7 +26,6 @@ use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; -use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedReceiver; @@ -58,13 +57,9 @@ pub struct SourceExecutor { // control options for connector level source_ctrl_opts: SourceCtrlOpts, - - // config for the connector node - connector_params: ConnectorParams, } impl SourceExecutor { - #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, stream_source_core: Option>, @@ -72,7 +67,6 @@ impl SourceExecutor { barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, source_ctrl_opts: SourceCtrlOpts, - connector_params: ConnectorParams, ) -> Self { Self { actor_ctx, @@ -81,7 +75,6 @@ impl SourceExecutor { barrier_receiver: Some(barrier_receiver), system_params, source_ctrl_opts, - connector_params, } } @@ -101,7 +94,6 @@ impl SourceExecutor { self.actor_ctx.fragment_id, source_desc.metrics.clone(), self.source_ctrl_opts.clone(), - self.connector_params.connector_client.clone(), source_desc.source.config.clone(), self.stream_source_core .as_ref() @@ -691,7 +683,6 @@ mod tests { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), - ConnectorParams::default(), ); let mut executor = executor.boxed().execute(); @@ -780,7 +771,6 @@ mod tests { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), - ConnectorParams::default(), ); let mut handler = executor.boxed().execute(); diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 355f7d874dec8..d3aa7e0be40af 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -96,7 +96,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { stream_source_core, upstream, source_ctrl_opts, - params.env.connector_params(), ) .boxed() } @@ -106,7 +105,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { stream_source_core, upstream, source_ctrl_opts, - params.env.connector_params(), ) .boxed() } @@ -116,7 +114,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { stream_source_core, upstream, source_ctrl_opts, - params.env.connector_params(), ) .boxed() } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index bec6abc46b343..7f8bff9d3c6f1 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -230,7 +230,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, source_ctrl_opts.clone(), - params.env.connector_params(), ) .boxed() } else { @@ -241,7 +240,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, source_ctrl_opts.clone(), - params.env.connector_params(), ) .boxed() } @@ -278,7 +276,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { system_params, // we don't expect any data in, so no need to set chunk_sizes SourceCtrlOpts::default(), - params.env.connector_params(), ); Ok((params.info, exec).into()) } diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index 7a089464cf22d..17304f170ff0b 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -83,7 +83,6 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { params.executor_stats.clone(), params.env.system_params_manager_ref().get_params(), source_ctrl_opts.clone(), - params.env.connector_params(), backfill_state_table, ); let [input]: [_; 1] = params.input.try_into().unwrap(); diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index d9bf9449cb53d..9a0b26f25f0c5 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -97,7 +97,7 @@ impl StreamEnvironment { use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment { server_addr: "127.0.0.1:5688".parse().unwrap(), - connector_params: ConnectorParams::new(None, SinkPayloadFormat::Json), + connector_params: ConnectorParams::new(SinkPayloadFormat::Json), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new(