diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 2031cd8eb0b55..6d848a5036ff1 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -479,6 +479,8 @@ pub struct UpsertMessage<'a> { pub struct NatsCommon { #[serde(rename = "server_url")] pub server_url: String, + #[serde(rename = "stream")] + pub stream: String, #[serde(rename = "subject")] pub subject: String, #[serde(rename = "connect_mode")] @@ -572,7 +574,8 @@ impl NatsCommon { > { let context = self.build_context().await?; let stream = self.build_or_get_stream(context.clone()).await?; - let name = format!("risingwave-consumer-{}-{}", self.subject, split_id); + let subject_name = self.subject.replace(',', "-"); + let name = format!("risingwave-consumer-{}-{}", subject_name, split_id); let mut config = jetstream::consumer::pull::Config { ack_policy: jetstream::consumer::AckPolicy::None, ..Default::default() @@ -605,10 +608,11 @@ impl NatsCommon { &self, jetstream: jetstream::Context, ) -> anyhow::Result { + let subjects: Vec = self.subject.split(',').map(|s| s.to_string()).collect(); let mut config = jetstream::stream::Config { - // the subject default use name value - name: self.subject.clone(), + name: self.stream.clone(), max_bytes: 1000000, + subjects, ..Default::default() }; if let Some(v) = self.max_bytes {