Skip to content

Commit

Permalink
feat(stream): add multiple server url feature into nats, delete usele…
Browse files Browse the repository at this point in the history
…ss field (#12138)
  • Loading branch information
yufansong authored Sep 7, 2023
1 parent 471aa2b commit c924c37
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
19 changes: 12 additions & 7 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
use clickhouse::Client;
use rdkafka::ClientConfig;
use risingwave_common::error::anyhow_error;
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};

use crate::aws_auth::AwsAuthProps;
use crate::deserialize_duration_from_string;
use crate::sink::SinkError;

// The file describes the common abstractions for each connector and can be used in both source and
// sink.
Expand Down Expand Up @@ -371,7 +373,16 @@ impl NatsCommon {
if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) {
connect_options = connect_options.user_and_password(v_user.into(), v_password.into());
}
let client = connect_options.connect(self.server_url.clone()).await?;
let servers = self.server_url.split(',').collect::<Vec<&str>>();
let client = connect_options
.connect(
servers
.iter()
.map(|url| url.parse())
.collect::<Result<Vec<async_nats::ServerAddr>, _>>()?,
)
.await
.map_err(|e| SinkError::Nats(anyhow_error!("build nats client error: {:?}", e)))?;
Ok(client)
}

Expand All @@ -381,12 +392,6 @@ impl NatsCommon {
Ok(jetstream)
}

pub(crate) async fn build_subscriber(&self) -> anyhow::Result<async_nats::Subscriber> {
let client = self.build_client().await?;
let subscription = client.subscribe(self.subject.clone()).await?;
Ok(subscription)
}

pub(crate) async fn build_consumer(
&self,
split_id: i32,
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,13 @@ pub mod enumerator;
pub mod source;
pub mod split;

use std::collections::HashMap;

use risingwave_pb::connector_service::TableSchema;
use serde::Deserialize;

use crate::common::NatsCommon;
pub const NATS_CONNECTOR: &str = "nats";

#[derive(Clone, Debug, Deserialize)]
pub struct NatsProperties {
/// Properties specified in the WITH clause by user
pub props: Option<HashMap<String, String>>,

/// Schema of the source specified by users
pub table_schema: Option<TableSchema>,

#[serde(flatten)]
pub common: NatsCommon,
}
Expand Down

0 comments on commit c924c37

Please sign in to comment.