Skip to content

Commit

Permalink
fix(connector): add nats stream field, and change subject into severa…
Browse files Browse the repository at this point in the history
…l input (#12799)
  • Loading branch information
yufansong authored Oct 12, 2023
1 parent f9e3d99 commit 9ad7857
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ pub struct UpsertMessage<'a> {
pub struct NatsCommon {
#[serde(rename = "server_url")]
pub server_url: String,
#[serde(rename = "stream")]
pub stream: String,
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "connect_mode")]
Expand Down Expand Up @@ -572,7 +574,8 @@ impl NatsCommon {
> {
let context = self.build_context().await?;
let stream = self.build_or_get_stream(context.clone()).await?;
let name = format!("risingwave-consumer-{}-{}", self.subject, split_id);
let subject_name = self.subject.replace(',', "-");
let name = format!("risingwave-consumer-{}-{}", subject_name, split_id);
let mut config = jetstream::consumer::pull::Config {
ack_policy: jetstream::consumer::AckPolicy::None,
..Default::default()
Expand Down Expand Up @@ -605,10 +608,11 @@ impl NatsCommon {
&self,
jetstream: jetstream::Context,
) -> anyhow::Result<jetstream::stream::Stream> {
let subjects: Vec<String> = self.subject.split(',').map(|s| s.to_string()).collect();
let mut config = jetstream::stream::Config {
// the subject default use name value
name: self.subject.clone(),
name: self.stream.clone(),
max_bytes: 1000000,
subjects,
..Default::default()
};
if let Some(v) = self.max_bytes {
Expand Down

0 comments on commit 9ad7857

Please sign in to comment.