diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index df32ccc5ea79..1be41b4e09ca 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -648,6 +648,7 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, stream: String, + durable_consumer_name: String, split_id: String, start_sequence: NatsOffset, mut config: jetstream::consumer::pull::Config, @@ -666,6 +667,7 @@ impl NatsCommon { NatsOffset::Earliest => DeliverPolicy::All, NatsOffset::Latest => DeliverPolicy::New, NatsOffset::SequenceNumber(v) => { + // for compatibility, we do not write to any state table now let parsed = v .parse::() .context("failed to parse nats offset as sequence number")?; @@ -680,12 +682,19 @@ impl NatsCommon { NatsOffset::None => DeliverPolicy::All, }; - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = deliver_policy; - config - }) - .await?; + let consumer = if let Ok(consumer) = stream.get_consumer(&name).await { + consumer + } else { + stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config.durable_name = Some(durable_consumer_name); + config.filter_subjects = + self.subject.split(',').map(|s| s.to_string()).collect(); + config + }) + .await? + }; Ok(consumer) } @@ -695,8 +704,17 @@ impl NatsCommon { stream: String, ) -> ConnectorResult { let subjects: Vec = self.subject.split(',').map(|s| s.to_string()).collect(); + if let Ok(mut stream_instance) = jetstream.get_stream(&stream).await { + tracing::info!( + "load existing nats stream ({:?}) with config {:?}", + stream, + stream_instance.info().await? + ); + return Ok(stream_instance); + } + let mut config = jetstream::stream::Config { - name: stream, + name: stream.clone(), max_bytes: 1000000, subjects, ..Default::default() @@ -716,6 +734,11 @@ impl NatsCommon { if let Some(v) = self.max_message_size { config.max_message_size = v; } + tracing::info!( + "create nats stream ({:?}) with config {:?}", + &stream, + config + ); let stream = jetstream.get_or_create_stream(config).await?; Ok(stream) } diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index e8b7a134b60f..09377b95f35c 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -21,6 +21,7 @@ use crate::parser::AccessError; use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError}; use crate::schema::InvalidOptionError; use crate::sink::SinkError; +use crate::source::nats::NatsJetStreamError; def_anyhow_newtype! { pub ConnectorError, @@ -61,6 +62,9 @@ def_anyhow_newtype! { async_nats::jetstream::consumer::pull::MessagesError => "Nats error", async_nats::jetstream::context::CreateStreamError => "Nats error", async_nats::jetstream::stream::ConsumerError => "Nats error", + async_nats::error::Error => "Nats error", + NatsJetStreamError => "Nats error", + icelake::Error => "Iceberg error", iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index dc965c9274ff..899fc2a2379f 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -26,6 +26,8 @@ pub mod nats; pub mod nexmark; pub mod pulsar; +use std::future::IntoFuture; + pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; use google_cloud_pubsub::subscription::Subscription; @@ -40,6 +42,8 @@ mod manager; pub mod reader; pub mod test_source; +use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy; +use async_nats::jetstream::context::Context as JetStreamContext; pub use manager::{SourceColumnDesc, SourceColumnType}; use risingwave_common::array::{Array, ArrayRef}; use thiserror_ext::AsReport; @@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool pub enum WaitCheckpointTask { CommitCdcOffset(Option<(SplitId, String)>), AckPubsubMessage(Subscription, Vec), + AckNatsJetStream(JetStreamContext, Vec, JetStreamAckPolicy), } impl WaitCheckpointTask { @@ -123,6 +128,49 @@ impl WaitCheckpointTask { } ack(&subscription, ack_ids).await; } + WaitCheckpointTask::AckNatsJetStream( + ref context, + reply_subjects_arrs, + ref ack_policy, + ) => { + async fn ack(context: &JetStreamContext, reply_subject: String) { + match context.publish(reply_subject.clone(), "".into()).await { + Err(e) => { + tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); + } + Ok(ack_future) => { + if let Err(e) = ack_future.into_future().await { + tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); + } + } + } + } + + let reply_subjects = reply_subjects_arrs + .iter() + .flat_map(|arr| { + arr.as_utf8() + .iter() + .flatten() + .map(|s| s.to_string()) + .collect::>() + }) + .collect::>(); + + match ack_policy { + JetStreamAckPolicy::None => (), + JetStreamAckPolicy::Explicit => { + for reply_subject in reply_subjects { + ack(context, reply_subject).await; + } + } + JetStreamAckPolicy::All => { + if let Some(reply_subject) = reply_subjects.last() { + ack(context, reply_subject.clone()).await; + } + } + } + } } } } diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index baf18be70314..b8a3b7948aee 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -17,15 +17,18 @@ pub mod source; pub mod split; use std::collections::HashMap; +use std::fmt::Display; use std::time::Duration; use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +use thiserror::Error; use with_options::WithOptions; use crate::connector_common::NatsCommon; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::nats::enumerator::NatsSplitEnumerator; use crate::source::nats::source::{NatsSplit, NatsSplitReader}; use crate::source::SourceProperties; @@ -33,17 +36,29 @@ use crate::{ deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string, }; +#[derive(Debug, Clone, Error)] +pub struct NatsJetStreamError(String); + +impl Display for NatsJetStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + pub const NATS_CONNECTOR: &str = "nats"; pub struct AckPolicyWrapper; impl AckPolicyWrapper { - pub fn parse_str(s: &str) -> Result { + pub fn parse_str(s: &str) -> Result { match s { "none" => Ok(AckPolicy::None), "all" => Ok(AckPolicy::All), "explicit" => Ok(AckPolicy::Explicit), - _ => Err(format!("Invalid AckPolicy '{}'", s)), + _ => Err(NatsJetStreamError(format!( + "Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`", + s + ))), } } } @@ -51,11 +66,14 @@ impl AckPolicyWrapper { pub struct ReplayPolicyWrapper; impl ReplayPolicyWrapper { - pub fn parse_str(s: &str) -> Result { + pub fn parse_str(s: &str) -> Result { match s { "instant" => Ok(ReplayPolicy::Instant), "original" => Ok(ReplayPolicy::Original), - _ => Err(format!("Invalid ReplayPolicy '{}'", s)), + _ => Err(NatsJetStreamError(format!( + "Invalid ReplayPolicy '{}', expect `instant` and `original`", + s + ))), } } } @@ -82,6 +100,9 @@ pub struct NatsProperties { #[serde(rename = "stream")] pub stream: String, + #[serde(rename = "durable_consumer_name")] + pub durable_consumer_name: String, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -257,6 +278,13 @@ impl NatsPropertiesConsumer { c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect() } } + + pub fn get_ack_policy(&self) -> ConnectorResult { + match &self.ack_policy { + Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?), + None => Ok(AckPolicy::None), + } + } } impl SourceProperties for NatsProperties { @@ -314,6 +342,7 @@ mod test { "consumer.num_replicas".to_string() => "3".to_string(), "consumer.memory_storage".to_string() => "true".to_string(), "consumer.backoff.sec".to_string() => "2,10,15".to_string(), + "durable_consumer_name".to_string() => "test_durable_consumer".to_string(), }; diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index 55c91457d4bf..88df55f19a62 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -22,6 +22,7 @@ pub struct NatsMessage { pub split_id: SplitId, pub sequence_number: String, pub payload: Vec, + pub reply_subject: Option, } impl From for SourceMessage { @@ -30,7 +31,10 @@ impl From for SourceMessage { key: None, payload: Some(message.payload), // For nats jetstream, use sequence id as offset - offset: message.sequence_number, + // + // DEPRECATED: no longer use sequence id as offset, let nats broker handle failover + // use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run + offset: message.reply_subject.unwrap_or_default(), split_id: message.split_id, meta: SourceMeta::Empty, } @@ -43,6 +47,10 @@ impl NatsMessage { split_id, sequence_number: message.info().unwrap().stream_sequence.to_string(), payload: message.message.payload.to_vec(), + reply_subject: message + .message + .reply + .map(|subject| subject.as_str().to_string()), } } } diff --git a/src/connector/src/source/nats/source/mod.rs b/src/connector/src/source/nats/source/mod.rs index c1393e389b55..9ba544b6ea7f 100644 --- a/src/connector/src/source/nats/source/mod.rs +++ b/src/connector/src/source/nats/source/mod.rs @@ -15,6 +15,7 @@ mod message; mod reader; +pub use message::*; pub use reader::*; pub use crate::source::nats::split::*; diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 937962effa4b..5bcae105e07b 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -51,8 +51,8 @@ impl SplitReader for NatsSplitReader { source_ctx: SourceContextRef, _columns: Option>, ) -> Result { - // TODO: to simplify the logic, return 1 split for first version - assert!(splits.len() == 1); + // We guarantee the split num always align with parallelism + assert_eq!(splits.len(), 1); let split = splits.into_iter().next().unwrap(); let split_id = split.split_id; let start_position = match &split.start_sequence { @@ -73,6 +73,9 @@ impl SplitReader for NatsSplitReader { } }, }, + // We have record on this Nats Split, contains the last seen offset (seq id) or reply subject + // We do not use the seq id as start position anymore, + // but just let the reader load from durable consumer on broker. start_position => start_position.to_owned(), }; @@ -85,6 +88,7 @@ impl SplitReader for NatsSplitReader { .common .build_consumer( properties.stream.clone(), + properties.durable_consumer_name.clone(), split_id.to_string(), start_position.clone(), config, diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 76284566f3d2..9b65e7429461 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -51,13 +51,8 @@ impl SplitMetaData for NatsSplit { serde_json::to_value(self.clone()).unwrap().into() } - fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { - let start_sequence = if last_seen_offset.is_empty() { - NatsOffset::Earliest - } else { - NatsOffset::SequenceNumber(last_seen_offset) - }; - self.start_sequence = start_sequence; + fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { + // we do not require to update the offset for nats, let durable consumer handle it Ok(()) } } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index b3a1cb5380d8..89335f8f0d80 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; +use async_nats::jetstream::consumer::AckPolicy; use futures::future::try_join_all; use futures::stream::pending; use futures::StreamExt; @@ -127,6 +128,18 @@ impl SourceReader { prop.subscription_client().await?, vec![], )), + ConnectorProperties::Nats(prop) => { + match prop.nats_properties_consumer.get_ack_policy()? { + a @ AckPolicy::Explicit | a @ AckPolicy::All => { + Some(WaitCheckpointTask::AckNatsJetStream( + prop.common.build_context().await?, + vec![], + a, + )) + } + AckPolicy::None => None, + } + } _ => None, }) } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 91eaf59b1c7b..20d3fc171423 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -643,6 +643,9 @@ NatsProperties: - name: stream field_type: String required: true + - name: durable_consumer_name + field_type: String + required: true NexmarkProperties: fields: - name: nexmark.split.num diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d4a02ce46244..6d5cf710d3bb 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -752,12 +752,14 @@ struct WaitCheckpointTaskBuilder { impl WaitCheckpointTaskBuilder { fn update_task_on_chunk(&mut self, offset_col: ArrayRef) { - #[expect(clippy::single_match)] match &mut self.building_task { WaitCheckpointTask::AckPubsubMessage(_, arrays) => { arrays.push(offset_col); } - _ => {} + WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => { + arrays.push(offset_col); + } + WaitCheckpointTask::CommitCdcOffset(_) => {} } }