Skip to content

Commit

Permalink
fix: remove connector_rpc_endpoint and related code (#15949)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Mar 28, 2024
1 parent dd1249d commit 016fad7
Show file tree
Hide file tree
Showing 35 changed files with 26 additions and 174 deletions.
2 changes: 0 additions & 2 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ restart_cn() {
127.0.0.1:5688 \
--async-stack-trace \
verbose \
--connector-rpc-endpoint \
127.0.0.1:50051 \
--parallelism \
4 \
--total-memory-bytes \
Expand Down
4 changes: 0 additions & 4 deletions ci/scripts/standalone-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ start_standalone_without_compactor() {
--advertise-addr 127.0.0.1:5690 \
--dashboard-host 127.0.0.1:5691 \
--prometheus-host 127.0.0.1:1250 \
--connector-rpc-endpoint 127.0.0.1:50051 \
--backend etcd \
--etcd-endpoints 127.0.0.1:2388 \
--state-store hummock+minio://hummockadmin:[email protected]:9301/hummock001 \
Expand All @@ -26,7 +25,6 @@ start_standalone_without_compactor() {
--prometheus-listener-addr 127.0.0.1:1222 \
--advertise-addr 127.0.0.1:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 127.0.0.1:50051 \
--parallelism 4 \
--total-memory-bytes 8589934592 \
--role both \
Expand All @@ -50,7 +48,6 @@ start_standalone() {
--advertise-addr 127.0.0.1:5690 \
--dashboard-host 127.0.0.1:5691 \
--prometheus-host 127.0.0.1:1250 \
--connector-rpc-endpoint 127.0.0.1:50051 \
--backend etcd \
--etcd-endpoints 127.0.0.1:2388 \
--state-store hummock+minio://hummockadmin:[email protected]:9301/hummock001 \
Expand All @@ -60,7 +57,6 @@ start_standalone() {
--prometheus-listener-addr 127.0.0.1:1222 \
--advertise-addr 127.0.0.1:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 127.0.0.1:50051 \
--parallelism 4 \
--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+azblob://<container_name> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+gcs://<bucket-name> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-local-fs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+fs://<local-path> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
# --parallelism 4 \
--role both \
--meta-address http://0.0.0.0:5690\" \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+obs://<bucket-name> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+oss://<bucket-name> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+s3://<bucket-name> \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
--prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
--state-store hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001 \
Expand All @@ -23,7 +22,6 @@ services:
--prometheus-listener-addr 0.0.0.0:1250 \
--advertise-addr 0.0.0.0:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 0.0.0.0:50051 \
#--parallelism 4 \
#--total-memory-bytes 8589934592 \
--role both \
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ impl SourceExecutor {
u32::MAX,
self.metrics,
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
"NA".to_owned(), // source name was not passed in batch plan
));
Expand Down
2 changes: 0 additions & 2 deletions src/cmd_all/scripts/standalone-demo-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ cargo run -p risingwave_cmd_all \
--advertise-addr 127.0.0.1:5690 \
--dashboard-host 127.0.0.1:5691 \
--prometheus-host 127.0.0.1:1250 \
--connector-rpc-endpoint 127.0.0.1:50051 \
--backend mem \
--state-store hummock+memory \
--data-directory hummock_001 \
Expand All @@ -23,7 +22,6 @@ cargo run -p risingwave_cmd_all \
--prometheus-listener-addr 127.0.0.1:1222 \
--advertise-addr 127.0.0.1:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 127.0.0.1:50051 \
--parallelism 4 \
--total-memory-bytes 8589934592 \
--role both \
Expand Down
2 changes: 0 additions & 2 deletions src/cmd_all/scripts/standalone-demo-full.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ start_standalone() {
--advertise-addr 127.0.0.1:5690 \
--dashboard-host 127.0.0.1:5691 \
--prometheus-host 127.0.0.1:1250 \
--connector-rpc-endpoint 127.0.0.1:50051 \
--backend etcd \
--etcd-endpoints 127.0.0.1:2388 \
--state-store hummock+minio://hummockadmin:[email protected]:9301/hummock001 \
Expand All @@ -29,7 +28,6 @@ start_standalone() {
--prometheus-listener-addr 127.0.0.1:1222 \
--advertise-addr 127.0.0.1:5688 \
--async-stack-trace verbose \
--connector-rpc-endpoint 127.0.0.1:50051 \
--parallelism 4 \
--total-memory-bytes 8589934592 \
--role both \
Expand Down
5 changes: 2 additions & 3 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ mod test {
sql_endpoint: None,
prometheus_endpoint: None,
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
vpc_id: None,
security_group_id: None,
Expand All @@ -313,6 +312,7 @@ mod test {
backup_storage_directory: None,
heap_profiling_dir: None,
dangerous_max_idle_secs: None,
connector_rpc_endpoint: None,
},
),
compute_opts: Some(
Expand All @@ -325,7 +325,6 @@ mod test {
http://127.0.0.1:5690/,
],
),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
Expand All @@ -336,13 +335,13 @@ mod test {
meta_file_cache_dir: None,
async_stack_trace: None,
heap_profiling_dir: None,
connector_rpc_endpoint: None,
},
),
frontend_opts: Some(
FrontendOpts {
listen_addr: "127.0.0.1:4566",
advertise_addr: None,
port: None,
meta_addr: List(
[
http://127.0.0.1:5690/,
Expand Down
9 changes: 5 additions & 4 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
pub meta_address: MetaAddressStrategy,

/// Endpoint of the connector node
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
pub connector_rpc_endpoint: Option<String>,

/// Payload format of connector sink rpc
#[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")]
pub connector_rpc_sink_payload_format: Option<String>,
Expand Down Expand Up @@ -131,6 +127,11 @@ pub struct ComputeNodeOpts {
#[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
#[override_opts(path = server.heap_profiling.dir)]
pub heap_profiling_dir: Option<String>,

/// Endpoint of the connector node.
#[deprecated = "connector node has been deprecated."]
#[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
pub connector_rpc_endpoint: Option<String>,
}

impl risingwave_common::opts::Opts for ComputeNodeOpts {
Expand Down
9 changes: 3 additions & 6 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
use risingwave_pb::task_service::task_service_server::TaskServiceServer;
use risingwave_rpc_client::{ComputeClientPool, ConnectorClient, ExtraInfoSourceRef, MetaClient};
use risingwave_rpc_client::{ComputeClientPool, ExtraInfoSourceRef, MetaClient};
use risingwave_storage::hummock::compactor::{
new_compaction_await_tree_reg_ref, start_compactor, CompactionExecutor, CompactorContext,
};
Expand Down Expand Up @@ -334,14 +334,11 @@ pub async fn compute_node_serve(
);

info!(
"connector param: {:?} {:?}",
opts.connector_rpc_endpoint, opts.connector_rpc_sink_payload_format
"connector param: payload_format={:?}",
opts.connector_rpc_sink_payload_format
);

let connector_client = ConnectorClient::try_new(opts.connector_rpc_endpoint.as_ref()).await;

let connector_params = risingwave_connector::ConnectorParams {
connector_client,
sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() {
None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk,
Some("json") => SinkPayloadFormat::Json,
Expand Down
2 changes: 0 additions & 2 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
use risingwave_connector::source::SourceCtrlOpts;
use risingwave_connector::ConnectorParams;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_pb::catalog::StreamSourceInfo;
Expand Down Expand Up @@ -175,7 +174,6 @@ async fn test_table_materialize() -> StreamResult<()> {
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
)
.boxed(),
);
Expand Down
8 changes: 1 addition & 7 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::time::Duration;

use duration_str::parse_std;
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_rpc_client::ConnectorClient;
use serde::de;

pub mod aws_utils;
Expand All @@ -65,17 +64,12 @@ mod with_options_test;

#[derive(Clone, Debug, Default)]
pub struct ConnectorParams {
pub connector_client: Option<ConnectorClient>,
pub sink_payload_format: SinkPayloadFormat,
}

impl ConnectorParams {
pub fn new(
connector_client: Option<ConnectorClient>,
sink_payload_format: SinkPayloadFormat,
) -> Self {
pub fn new(sink_payload_format: SinkPayloadFormat) -> Self {
Self {
connector_client,
sink_payload_format,
}
}
Expand Down
15 changes: 0 additions & 15 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;

use super::cdc::DebeziumCdcMeta;
Expand Down Expand Up @@ -150,7 +149,6 @@ impl Default for SourceCtrlOpts {
pub struct SourceEnumeratorContext {
pub info: SourceEnumeratorInfo,
pub metrics: Arc<EnumeratorMetrics>,
pub connector_client: Option<ConnectorClient>,
}

#[derive(Clone, Debug, Default)]
Expand All @@ -160,7 +158,6 @@ pub struct SourceEnumeratorInfo {

#[derive(Debug, Default)]
pub struct SourceContext {
pub connector_client: Option<ConnectorClient>,
pub actor_id: u32,
pub source_id: TableId,
// There should be a 1-1 mapping between `source_id` & `fragment_id`
Expand All @@ -178,12 +175,10 @@ impl SourceContext {
fragment_id: u32,
metrics: Arc<SourceMetrics>,
source_ctrl_opts: SourceCtrlOpts,
connector_client: Option<ConnectorClient>,
connector_props: ConnectorProperties,
source_name: String,
) -> Self {
Self {
connector_client,
actor_id,
source_id,
fragment_id,
Expand Down Expand Up @@ -673,7 +668,6 @@ mod tests {
#[test]
fn test_extract_cdc_properties() {
let user_props_mysql: HashMap<String, String> = convert_args!(hashmap!(
"connector_node_addr" => "localhost",
"connector" => "mysql-cdc",
"database.hostname" => "127.0.0.1",
"database.port" => "3306",
Expand All @@ -684,7 +678,6 @@ mod tests {
));

let user_props_postgres: HashMap<String, String> = convert_args!(hashmap!(
"connector_node_addr" => "localhost",
"connector" => "postgres-cdc",
"database.hostname" => "127.0.0.1",
"database.port" => "5432",
Expand All @@ -697,10 +690,6 @@ mod tests {

let conn_props = ConnectorProperties::extract(user_props_mysql, true).unwrap();
if let ConnectorProperties::MysqlCdc(c) = conn_props {
assert_eq!(
c.properties.get("connector_node_addr").unwrap(),
"localhost"
);
assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.properties.get("database.port").unwrap(), "3306");
assert_eq!(c.properties.get("database.user").unwrap(), "root");
Expand All @@ -713,10 +702,6 @@ mod tests {

let conn_props = ConnectorProperties::extract(user_props_postgres, true).unwrap();
if let ConnectorProperties::PostgresCdc(c) = conn_props {
assert_eq!(
c.properties.get("connector_node_addr").unwrap(),
"localhost"
);
assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.properties.get("database.port").unwrap(), "5432");
assert_eq!(c.properties.get("database.user").unwrap(), "root");
Expand Down
Loading

0 comments on commit 016fad7

Please sign in to comment.