From 2f4771f4f5835f7d4a0df48931677886f3e4f93c Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:55:57 -0700 Subject: [PATCH] feat(stream): add multiple server url feature into nats, delete useless field (#12138) --- src/connector/src/common.rs | 19 ++++++++++++------- src/connector/src/source/nats/mod.rs | 9 --------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 306d79c3f5231..5a03fc7bfd9af 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -22,12 +22,14 @@ use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use clickhouse::Client; use rdkafka::ClientConfig; +use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; +use crate::sink::SinkError; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -371,7 +373,16 @@ impl NatsCommon { if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) { connect_options = connect_options.user_and_password(v_user.into(), v_password.into()); } - let client = connect_options.connect(self.server_url.clone()).await?; + let servers = self.server_url.split(',').collect::>(); + let client = connect_options + .connect( + servers + .iter() + .map(|url| url.parse()) + .collect::, _>>()?, + ) + .await + .map_err(|e| SinkError::Nats(anyhow_error!("build nats client error: {:?}", e)))?; Ok(client) } @@ -381,12 +392,6 @@ impl NatsCommon { Ok(jetstream) } - pub(crate) async fn build_subscriber(&self) -> anyhow::Result { - let client = self.build_client().await?; - let subscription = client.subscribe(self.subject.clone()).await?; - Ok(subscription) - } - pub(crate) async fn build_consumer( &self, split_id: i32, diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 946fb93e006d5..2aa9dc2de55f2 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -16,9 +16,6 @@ pub mod enumerator; pub mod source; pub mod split; -use std::collections::HashMap; - -use risingwave_pb::connector_service::TableSchema; use serde::Deserialize; use crate::common::NatsCommon; @@ -26,12 +23,6 @@ pub const NATS_CONNECTOR: &str = "nats"; #[derive(Clone, Debug, Deserialize)] pub struct NatsProperties { - /// Properties specified in the WITH clause by user - pub props: Option>, - - /// Schema of the source specified by users - pub table_schema: Option, - #[serde(flatten)] pub common: NatsCommon, }