From 35d384f0bb726c8f39dec3d96f3370af89245a0c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Oct 2023 14:26:55 +0800 Subject: [PATCH 1/3] remove connector node from risedev --- risedev.yml | 36 ------------------- src/connector/src/sink/log_store.rs | 6 ++-- .../src/config_gen/prometheus_gen.rs | 12 ------- 3 files changed, 4 insertions(+), 50 deletions(-) diff --git a/risedev.yml b/risedev.yml index 8ca3df2a7e69..142b68aeb959 100644 --- a/risedev.yml +++ b/risedev.yml @@ -26,9 +26,6 @@ profile: # - use: aws-s3 # bucket: test-bucket - # If you want to create CDC source table, uncomment the following line - # - use: connector-node - # if you want to enable etcd backend, uncomment the following lines. # - use: etcd # unsafe-no-fsync: true @@ -110,22 +107,6 @@ profile: - use: kafka persist-data: true - full-with-connector: - steps: - - use: minio - - use: etcd - - use: meta-node - - use: compute-node - - use: frontend - - use: compactor - - use: prometheus - - use: grafana - - use: zookeeper - persist-data: true - - use: kafka - persist-data: true - - use: connector-node - standalone-full-peripherals: steps: - use: minio @@ -144,7 +125,6 @@ profile: persist-data: true - use: kafka persist-data: true - - use: connector-node standalone-minio-etcd: steps: @@ -1058,9 +1038,6 @@ template: # Frontend used by this Prometheus instance provide-frontend: "frontend*" - # Connector-node used by this Prometheus instance - provide-connector-node: "connector*" - frontend: # Advertise address of frontend address: "127.0.0.1" @@ -1117,19 +1094,6 @@ template: # If `user-managed` is true, this service will be started by user with the above config user-managed: false - connector-node: - # Connector node advertise address - address: "127.0.0.1" - - # Connector node listen port - port: 50051 - - # Prometheus exporter listen port - exporter-port: 50052 - - # Id of this instance - id: connector-${port} - grafana: # Listen address of Grafana listen-address: ${address} diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index f7d99141139f..ad28f67140f2 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -204,9 +204,11 @@ impl LogReader for MonitoredLogReader { self.metrics.log_store_latest_read_epoch.set(*epoch as _); } if let LogStoreReadItem::StreamChunk { chunk, .. } = item { + let cardinality = chunk.cardinality(); + self.metrics.log_store_read_rows.inc_by(cardinality as _); self.metrics - .log_store_read_rows - .inc_by(chunk.cardinality() as _); + .connector_sink_rows_received + .inc_by(cardinality as _); } }) } diff --git a/src/risedevtool/src/config_gen/prometheus_gen.rs b/src/risedevtool/src/config_gen/prometheus_gen.rs index aa6422416a31..2143031f1ba2 100644 --- a/src/risedevtool/src/config_gen/prometheus_gen.rs +++ b/src/risedevtool/src/config_gen/prometheus_gen.rs @@ -79,14 +79,6 @@ impl PrometheusGen { .map(|node| format!("\"{}:{}\"", node.address, 9644)) .join(","); - let connector_node_targets = config - .provide_connector_node - .as_ref() - .unwrap() - .iter() - .map(|node| format!("\"{}:{}\"", node.address, node.exporter_port)) - .join(","); - let now = Local::now().format("%Y%m%d-%H%M%S"); let remote_write = if config.remote_write { @@ -151,10 +143,6 @@ scrape_configs: - job_name: redpanda static_configs: - targets: [{redpanda_targets}] - - - job_name: connector-node - static_configs: - - targets: [{connector_node_targets}] "#, ) } From eb900c1f8cec0be8d6097aa4d25d2d381bfe7682 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Oct 2023 15:48:00 +0800 Subject: [PATCH 2/3] remove metric --- src/connector/src/sink/log_store.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index ad28f67140f2..f7d99141139f 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -204,11 +204,9 @@ impl LogReader for MonitoredLogReader { self.metrics.log_store_latest_read_epoch.set(*epoch as _); } if let LogStoreReadItem::StreamChunk { chunk, .. } = item { - let cardinality = chunk.cardinality(); - self.metrics.log_store_read_rows.inc_by(cardinality as _); self.metrics - .connector_sink_rows_received - .inc_by(cardinality as _); + .log_store_read_rows + .inc_by(chunk.cardinality() as _); } }) } From 0c6a1bedd50fa4db10c3eae9b2dad0ef0f8c36d0 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Oct 2023 15:56:18 +0800 Subject: [PATCH 3/3] remove related codes --- src/risedevtool/src/bin/risedev-compose.rs | 1 - src/risedevtool/src/bin/risedev-dev.rs | 20 ++----- src/risedevtool/src/config.rs | 3 - src/risedevtool/src/service_config.rs | 15 ----- src/risedevtool/src/task.rs | 2 - src/risedevtool/src/task/connector_service.rs | 60 ------------------- 6 files changed, 4 insertions(+), 97 deletions(-) delete mode 100644 src/risedevtool/src/task/connector_service.rs diff --git a/src/risedevtool/src/bin/risedev-compose.rs b/src/risedevtool/src/bin/risedev-compose.rs index 087c6519717f..63925d919bb2 100644 --- a/src/risedevtool/src/bin/risedev-compose.rs +++ b/src/risedevtool/src/bin/risedev-compose.rs @@ -222,7 +222,6 @@ fn main() -> Result<()> { (c.address.clone(), c.compose(&compose_config)?) } ServiceConfig::Redis(_) => return Err(anyhow!("not supported")), - ServiceConfig::ConnectorNode(_) => return Err(anyhow!("not supported")), }; compose.container_name = service.id().to_string(); if opts.deploy { diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index c2e586802489..474e8dd0cbd1 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -25,10 +25,10 @@ use indicatif::ProgressBar; use risedev::util::{complete_spin, fail_spin}; use risedev::{ generate_risedev_env, preflight_check, AwsS3Config, CompactorService, ComputeNodeService, - ConfigExpander, ConfigureTmuxTask, ConnectorNodeService, EnsureStopService, ExecuteContext, - FrontendService, GrafanaService, KafkaService, MetaNodeService, MinioService, OpendalConfig, - PrometheusService, PubsubService, RedisService, ServiceConfig, Task, TempoService, - ZooKeeperService, RISEDEV_SESSION_NAME, + ConfigExpander, ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService, + GrafanaService, KafkaService, MetaNodeService, MinioService, OpendalConfig, PrometheusService, + PubsubService, RedisService, ServiceConfig, Task, TempoService, ZooKeeperService, + RISEDEV_SESSION_NAME, }; use tempfile::tempdir; use yaml_rust::YamlEmitter; @@ -114,7 +114,6 @@ fn task_main( ServiceConfig::AwsS3(_) => None, ServiceConfig::OpenDal(_) => None, ServiceConfig::RedPanda(_) => None, - ServiceConfig::ConnectorNode(c) => Some((c.port, c.id.clone())), }; if let Some(x) = listen_info { @@ -339,17 +338,6 @@ fn task_main( ctx.pb .set_message(format!("redis {}:{}", c.address, c.port)); } - ServiceConfig::ConnectorNode(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = ConnectorNodeService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = - risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("connector grpc://{}:{}", c.address, c.port)); - } } let service_id = service.id().to_string(); diff --git a/src/risedevtool/src/config.rs b/src/risedevtool/src/config.rs index fe7d677a6a76..09e530487d4f 100644 --- a/src/risedevtool/src/config.rs +++ b/src/risedevtool/src/config.rs @@ -171,9 +171,6 @@ impl ConfigExpander { "kafka" => ServiceConfig::Kafka(serde_yaml::from_str(&out_str)?), "pubsub" => ServiceConfig::Pubsub(serde_yaml::from_str(&out_str)?), "redis" => ServiceConfig::Redis(serde_yaml::from_str(&out_str)?), - "connector-node" => { - ServiceConfig::ConnectorNode(serde_yaml::from_str(&out_str)?) - } "zookeeper" => ServiceConfig::ZooKeeper(serde_yaml::from_str(&out_str)?), "redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?), other => return Err(anyhow!("unsupported use type: {}", other)), diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 8890f984971f..26ccd3af7e15 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -190,7 +190,6 @@ pub struct PrometheusConfig { pub provide_etcd: Option>, pub provide_redpanda: Option>, pub provide_frontend: Option>, - pub provide_connector_node: Option>, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -318,18 +317,6 @@ pub struct RedisConfig { pub address: String, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -#[serde(deny_unknown_fields)] -pub struct ConnectorNodeConfig { - #[serde(rename = "use")] - phantom_use: Option, - pub id: String, - pub port: u16, - pub exporter_port: u16, - pub address: String, -} - /// All service configuration #[derive(Clone, Debug, PartialEq)] pub enum ServiceConfig { @@ -349,7 +336,6 @@ pub enum ServiceConfig { Redis(RedisConfig), ZooKeeper(ZooKeeperConfig), RedPanda(RedPandaConfig), - ConnectorNode(ConnectorNodeConfig), } impl ServiceConfig { @@ -370,7 +356,6 @@ impl ServiceConfig { Self::Pubsub(c) => &c.id, Self::Redis(c) => &c.id, Self::RedPanda(c) => &c.id, - Self::ConnectorNode(c) => &c.id, Self::OpenDal(c) => &c.id, } } diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index 262a68c52cb9..a2e4ec9bf46d 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -15,7 +15,6 @@ mod compactor_service; mod compute_node_service; mod configure_tmux_service; -mod connector_service; mod ensure_stop_service; mod etcd_service; mod frontend_service; @@ -52,7 +51,6 @@ pub use utils::*; pub use self::compactor_service::*; pub use self::compute_node_service::*; pub use self::configure_tmux_service::*; -pub use self::connector_service::*; pub use self::ensure_stop_service::*; pub use self::etcd_service::*; pub use self::frontend_service::*; diff --git a/src/risedevtool/src/task/connector_service.rs b/src/risedevtool/src/task/connector_service.rs deleted file mode 100644 index 05268db6a43e..000000000000 --- a/src/risedevtool/src/task/connector_service.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::env; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::process::Command; - -use anyhow::{anyhow, Result}; - -use crate::{ConnectorNodeConfig, ExecuteContext, Task}; - -pub struct ConnectorNodeService { - pub config: ConnectorNodeConfig, -} - -impl ConnectorNodeService { - pub fn new(config: ConnectorNodeConfig) -> Result { - Ok(Self { config }) - } - - fn connector_path(&self) -> Result { - let prefix_bin = env::var("PREFIX_BIN")?; - Ok(Path::new(&prefix_bin) - .join("connector-node") - .join("start-service.sh")) - } -} - -impl Task for ConnectorNodeService { - fn execute(&mut self, ctx: &mut ExecuteContext) -> Result<()> { - ctx.service(self); - ctx.pb.set_message("starting"); - let path = self.connector_path()?; - if !path.exists() { - return Err(anyhow!("RisingWave connector binary not found in {:?}\nPlease enable building RisingWave connector in `./risedev configure`?", path)); - } - let mut cmd = Command::new(path); - cmd.arg("-p").arg(self.config.port.to_string()); - ctx.run_command(ctx.tmux_run(cmd)?)?; - ctx.pb.set_message("started"); - - Ok(()) - } - - fn id(&self) -> String { - self.config.id.clone() - } -}