From db04f8b9ad64a880c696ffcf63171d29467977a4 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 6 Sep 2023 16:49:37 -0700 Subject: [PATCH 1/2] add multiple server --- src/connector/src/common.rs | 16 +++++++++------- src/connector/src/source/nats/mod.rs | 9 --------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 306d79c3f523..47481dd8eb4b 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -371,7 +371,15 @@ impl NatsCommon { if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) { connect_options = connect_options.user_and_password(v_user.into(), v_password.into()); } - let client = connect_options.connect(self.server_url.clone()).await?; + let servers = self.server_url.split(',').collect::>(); + let client = connect_options + .connect( + servers + .iter() + .map(|url| url.parse()) + .collect::, _>>()?, + ) + .await?; Ok(client) } @@ -381,12 +389,6 @@ impl NatsCommon { Ok(jetstream) } - pub(crate) async fn build_subscriber(&self) -> anyhow::Result { - let client = self.build_client().await?; - let subscription = client.subscribe(self.subject.clone()).await?; - Ok(subscription) - } - pub(crate) async fn build_consumer( &self, split_id: i32, diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 946fb93e006d..2aa9dc2de55f 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -16,9 +16,6 @@ pub mod enumerator; pub mod source; pub mod split; -use std::collections::HashMap; - -use risingwave_pb::connector_service::TableSchema; use serde::Deserialize; use crate::common::NatsCommon; @@ -26,12 +23,6 @@ pub const NATS_CONNECTOR: &str = "nats"; #[derive(Clone, Debug, Deserialize)] pub struct NatsProperties { - /// Properties specified in the WITH clause by user - pub props: Option>, - - /// Schema of the source specified by users - pub table_schema: Option, - #[serde(flatten)] pub common: NatsCommon, } From c8dd50f6aba8782f4a05fb9bb81d9a60c904512f Mon Sep 17 00:00:00 2001 From: yufansong Date: Thu, 7 Sep 2023 10:19:42 -0700 Subject: [PATCH 2/2] add map error --- src/connector/src/common.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 47481dd8eb4b..5a03fc7bfd9a 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -22,12 +22,14 @@ use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use clickhouse::Client; use rdkafka::ClientConfig; +use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; +use crate::sink::SinkError; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -379,7 +381,8 @@ impl NatsCommon { .map(|url| url.parse()) .collect::, _>>()?, ) - .await?; + .await + .map_err(|e| SinkError::Nats(anyhow_error!("build nats client error: {:?}", e)))?; Ok(client) }