From 381271486cf01be2c57345121b1b888d641cf5b5 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 11 Oct 2024 18:31:13 +0800 Subject: [PATCH 1/4] ack on Nats JetStream Signed-off-by: tabVersion --- src/connector/src/error.rs | 3 ++ src/connector/src/source/base.rs | 2 + src/connector/src/source/mod.rs | 48 +++++++++++++++++++ src/connector/src/source/nats/mod.rs | 31 ++++++++++-- .../src/source/nats/source/message.rs | 15 +++++- src/connector/src/source/nats/source/mod.rs | 1 + src/connector/src/source/reader/reader.rs | 5 ++ .../src/executor/source/source_executor.rs | 6 ++- 8 files changed, 104 insertions(+), 7 deletions(-) diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index e8b7a134b60f1..c42af8fcfa024 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -21,6 +21,7 @@ use crate::parser::AccessError; use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError}; use crate::schema::InvalidOptionError; use crate::sink::SinkError; +use crate::source::nats::NatsJetStreamError; def_anyhow_newtype! { pub ConnectorError, @@ -61,6 +62,8 @@ def_anyhow_newtype! { async_nats::jetstream::consumer::pull::MessagesError => "Nats error", async_nats::jetstream::context::CreateStreamError => "Nats error", async_nats::jetstream::stream::ConsumerError => "Nats error", + NatsJetStreamError => "Nats error", + icelake::Error => "Iceberg error", iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 59e3585431a60..8afb641254a8b 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -40,6 +40,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::kinesis::KinesisMeta; use super::monitor::SourceMetrics; +use super::nats::NatsJetStreamMeta; use super::nexmark::source::message::NexmarkMeta; use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::error::ConnectorResult as Result; @@ -630,6 +631,7 @@ pub enum SourceMeta { GooglePubsub(GooglePubsubMeta), Datagen(DatagenMeta), DebeziumCdc(DebeziumCdcMeta), + NatsJetStream(NatsJetStreamMeta), // For the source that doesn't have meta data. Empty, } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index dc965c9274ff1..899fc2a2379f5 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -26,6 +26,8 @@ pub mod nats; pub mod nexmark; pub mod pulsar; +use std::future::IntoFuture; + pub use base::{UPSTREAM_SOURCE_KEY, *}; pub(crate) use common::*; use google_cloud_pubsub::subscription::Subscription; @@ -40,6 +42,8 @@ mod manager; pub mod reader; pub mod test_source; +use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy; +use async_nats::jetstream::context::Context as JetStreamContext; pub use manager::{SourceColumnDesc, SourceColumnType}; use risingwave_common::array::{Array, ArrayRef}; use thiserror_ext::AsReport; @@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool pub enum WaitCheckpointTask { CommitCdcOffset(Option<(SplitId, String)>), AckPubsubMessage(Subscription, Vec), + AckNatsJetStream(JetStreamContext, Vec, JetStreamAckPolicy), } impl WaitCheckpointTask { @@ -123,6 +128,49 @@ impl WaitCheckpointTask { } ack(&subscription, ack_ids).await; } + WaitCheckpointTask::AckNatsJetStream( + ref context, + reply_subjects_arrs, + ref ack_policy, + ) => { + async fn ack(context: &JetStreamContext, reply_subject: String) { + match context.publish(reply_subject.clone(), "".into()).await { + Err(e) => { + tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); + } + Ok(ack_future) => { + if let Err(e) = ack_future.into_future().await { + tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); + } + } + } + } + + let reply_subjects = reply_subjects_arrs + .iter() + .flat_map(|arr| { + arr.as_utf8() + .iter() + .flatten() + .map(|s| s.to_string()) + .collect::>() + }) + .collect::>(); + + match ack_policy { + JetStreamAckPolicy::None => (), + JetStreamAckPolicy::Explicit => { + for reply_subject in reply_subjects { + ack(context, reply_subject).await; + } + } + JetStreamAckPolicy::All => { + if let Some(reply_subject) = reply_subjects.last() { + ack(context, reply_subject.clone()).await; + } + } + } + } } } } diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index baf18be703146..df5fc73315fb0 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -23,27 +23,40 @@ use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +use strum_macros::Display; +use thiserror::Error; use with_options::WithOptions; use crate::connector_common::NatsCommon; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::nats::enumerator::NatsSplitEnumerator; +pub use crate::source::nats::source::NatsJetStreamMeta; use crate::source::nats::source::{NatsSplit, NatsSplitReader}; use crate::source::SourceProperties; use crate::{ deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string, }; +#[derive(Debug, Clone, Error, Display)] +pub enum NatsJetStreamError { + InvalidAckPolicy(String), + InvalidReplayPolicy(String), +} + pub const NATS_CONNECTOR: &str = "nats"; pub struct AckPolicyWrapper; impl AckPolicyWrapper { - pub fn parse_str(s: &str) -> Result { + pub fn parse_str(s: &str) -> Result { match s { "none" => Ok(AckPolicy::None), "all" => Ok(AckPolicy::All), "explicit" => Ok(AckPolicy::Explicit), - _ => Err(format!("Invalid AckPolicy '{}'", s)), + _ => Err(NatsJetStreamError::InvalidAckPolicy(format!( + "Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`", + s + ))), } } } @@ -51,11 +64,14 @@ impl AckPolicyWrapper { pub struct ReplayPolicyWrapper; impl ReplayPolicyWrapper { - pub fn parse_str(s: &str) -> Result { + pub fn parse_str(s: &str) -> Result { match s { "instant" => Ok(ReplayPolicy::Instant), "original" => Ok(ReplayPolicy::Original), - _ => Err(format!("Invalid ReplayPolicy '{}'", s)), + _ => Err(NatsJetStreamError::InvalidReplayPolicy(format!( + "Invalid ReplayPolicy '{}', expect `instant` and `original`", + s + ))), } } } @@ -257,6 +273,13 @@ impl NatsPropertiesConsumer { c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect() } } + + pub fn get_ack_policy(&self) -> ConnectorResult { + match &self.ack_policy { + Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?), + None => Ok(AckPolicy::None), + } + } } impl SourceProperties for NatsProperties { diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index 55c91457d4bfc..717124dfba7d7 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -22,6 +22,12 @@ pub struct NatsMessage { pub split_id: SplitId, pub sequence_number: String, pub payload: Vec, + pub reply_subject: Option, +} + +#[derive(Clone, Debug)] +pub struct NatsJetStreamMeta { + pub reply_subject: Option, } impl From for SourceMessage { @@ -30,9 +36,12 @@ impl From for SourceMessage { key: None, payload: Some(message.payload), // For nats jetstream, use sequence id as offset + // DEPRECATED: no longer use sequence id as offset, let nats broker handle failover offset: message.sequence_number, split_id: message.split_id, - meta: SourceMeta::Empty, + meta: SourceMeta::NatsJetStream(NatsJetStreamMeta { + reply_subject: message.reply_subject, + }), } } } @@ -43,6 +52,10 @@ impl NatsMessage { split_id, sequence_number: message.info().unwrap().stream_sequence.to_string(), payload: message.message.payload.to_vec(), + reply_subject: message + .message + .reply + .map(|subject| subject.as_str().to_string()), } } } diff --git a/src/connector/src/source/nats/source/mod.rs b/src/connector/src/source/nats/source/mod.rs index c1393e389b552..9ba544b6ea7fc 100644 --- a/src/connector/src/source/nats/source/mod.rs +++ b/src/connector/src/source/nats/source/mod.rs @@ -15,6 +15,7 @@ mod message; mod reader; +pub use message::*; pub use reader::*; pub use crate::source::nats::split::*; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index b3a1cb5380d8c..c1d90c2a972d4 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -127,6 +127,11 @@ impl SourceReader { prop.subscription_client().await?, vec![], )), + ConnectorProperties::Nats(prop) => Some(WaitCheckpointTask::AckNatsJetStream( + prop.common.build_context().await?, + vec![], + prop.nats_properties_consumer.get_ack_policy()?, + )), _ => None, }) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d4a02ce462441..6d5cf710d3bb0 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -752,12 +752,14 @@ struct WaitCheckpointTaskBuilder { impl WaitCheckpointTaskBuilder { fn update_task_on_chunk(&mut self, offset_col: ArrayRef) { - #[expect(clippy::single_match)] match &mut self.building_task { WaitCheckpointTask::AckPubsubMessage(_, arrays) => { arrays.push(offset_col); } - _ => {} + WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => { + arrays.push(offset_col); + } + WaitCheckpointTask::CommitCdcOffset(_) => {} } } From a332ff779d8b6cfd54ad8b9d6575f6dc0a3c904b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 14 Oct 2024 14:23:29 +0800 Subject: [PATCH 2/4] fix offset column Signed-off-by: tabVersion --- src/connector/src/source/base.rs | 2 -- src/connector/src/source/nats/mod.rs | 1 - .../src/source/nats/source/message.rs | 13 ++++--------- src/connector/src/source/nats/source/reader.rs | 7 +++++-- src/connector/src/source/reader/reader.rs | 18 +++++++++++++----- 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8afb641254a8b..59e3585431a60 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -40,7 +40,6 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::kinesis::KinesisMeta; use super::monitor::SourceMetrics; -use super::nats::NatsJetStreamMeta; use super::nexmark::source::message::NexmarkMeta; use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::error::ConnectorResult as Result; @@ -631,7 +630,6 @@ pub enum SourceMeta { GooglePubsub(GooglePubsubMeta), Datagen(DatagenMeta), DebeziumCdc(DebeziumCdcMeta), - NatsJetStream(NatsJetStreamMeta), // For the source that doesn't have meta data. Empty, } diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index df5fc73315fb0..f4d8f0fbc5a52 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -30,7 +30,6 @@ use with_options::WithOptions; use crate::connector_common::NatsCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::source::nats::enumerator::NatsSplitEnumerator; -pub use crate::source::nats::source::NatsJetStreamMeta; use crate::source::nats::source::{NatsSplit, NatsSplitReader}; use crate::source::SourceProperties; use crate::{ diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index 717124dfba7d7..88df55f19a620 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -25,23 +25,18 @@ pub struct NatsMessage { pub reply_subject: Option, } -#[derive(Clone, Debug)] -pub struct NatsJetStreamMeta { - pub reply_subject: Option, -} - impl From for SourceMessage { fn from(message: NatsMessage) -> Self { SourceMessage { key: None, payload: Some(message.payload), // For nats jetstream, use sequence id as offset + // // DEPRECATED: no longer use sequence id as offset, let nats broker handle failover - offset: message.sequence_number, + // use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run + offset: message.reply_subject.unwrap_or_default(), split_id: message.split_id, - meta: SourceMeta::NatsJetStream(NatsJetStreamMeta { - reply_subject: message.reply_subject, - }), + meta: SourceMeta::Empty, } } } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 937962effa4be..0b3036d4a56c0 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -51,8 +51,8 @@ impl SplitReader for NatsSplitReader { source_ctx: SourceContextRef, _columns: Option>, ) -> Result { - // TODO: to simplify the logic, return 1 split for first version - assert!(splits.len() == 1); + // We guarantee the split num always align with parallelism + assert_eq!(splits.len(), 1); let split = splits.into_iter().next().unwrap(); let split_id = split.split_id; let start_position = match &split.start_sequence { @@ -73,6 +73,9 @@ impl SplitReader for NatsSplitReader { } }, }, + // We have record on this Nats Split, contains the last seen offset (seq id) or reply subject + // We do not use the seq id as start position anymore, + // but just let the reader load from durable consumer on broker. start_position => start_position.to_owned(), }; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index c1d90c2a972d4..89335f8f0d80e 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; +use async_nats::jetstream::consumer::AckPolicy; use futures::future::try_join_all; use futures::stream::pending; use futures::StreamExt; @@ -127,11 +128,18 @@ impl SourceReader { prop.subscription_client().await?, vec![], )), - ConnectorProperties::Nats(prop) => Some(WaitCheckpointTask::AckNatsJetStream( - prop.common.build_context().await?, - vec![], - prop.nats_properties_consumer.get_ack_policy()?, - )), + ConnectorProperties::Nats(prop) => { + match prop.nats_properties_consumer.get_ack_policy()? { + a @ AckPolicy::Explicit | a @ AckPolicy::All => { + Some(WaitCheckpointTask::AckNatsJetStream( + prop.common.build_context().await?, + vec![], + a, + )) + } + AckPolicy::None => None, + } + } _ => None, }) } From b70f2966e697a3604a24ea8606720367adfd645b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 14 Oct 2024 14:35:36 +0800 Subject: [PATCH 3/4] resolve comments Signed-off-by: tabVersion --- src/connector/src/source/nats/mod.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index f4d8f0fbc5a52..9a3a9e052a7f9 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -17,13 +17,13 @@ pub mod source; pub mod split; use std::collections::HashMap; +use std::fmt::Display; use std::time::Duration; use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use strum_macros::Display; use thiserror::Error; use with_options::WithOptions; @@ -36,10 +36,13 @@ use crate::{ deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string, }; -#[derive(Debug, Clone, Error, Display)] -pub enum NatsJetStreamError { - InvalidAckPolicy(String), - InvalidReplayPolicy(String), +#[derive(Debug, Clone, Error)] +pub struct NatsJetStreamError(String); + +impl Display for NatsJetStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } } pub const NATS_CONNECTOR: &str = "nats"; @@ -52,7 +55,7 @@ impl AckPolicyWrapper { "none" => Ok(AckPolicy::None), "all" => Ok(AckPolicy::All), "explicit" => Ok(AckPolicy::Explicit), - _ => Err(NatsJetStreamError::InvalidAckPolicy(format!( + _ => Err(NatsJetStreamError(format!( "Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`", s ))), @@ -67,7 +70,7 @@ impl ReplayPolicyWrapper { match s { "instant" => Ok(ReplayPolicy::Instant), "original" => Ok(ReplayPolicy::Original), - _ => Err(NatsJetStreamError::InvalidReplayPolicy(format!( + _ => Err(NatsJetStreamError(format!( "Invalid ReplayPolicy '{}', expect `instant` and `original`", s ))), From c3d8176e7fc3e147a7de60eba5482bbe130a699b Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 15 Oct 2024 15:42:45 +0800 Subject: [PATCH 4/4] refactor: migrate Nats JetStream consumer to durable one (#18895) Signed-off-by: tabVersion --- src/connector/src/connector_common/common.rs | 37 +++++++++++++++---- src/connector/src/error.rs | 1 + src/connector/src/source/nats/mod.rs | 4 ++ .../src/source/nats/source/reader.rs | 1 + src/connector/src/source/nats/split.rs | 9 +---- src/connector/with_options_source.yaml | 3 ++ 6 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index df32ccc5ea79d..1be41b4e09caa 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -648,6 +648,7 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, stream: String, + durable_consumer_name: String, split_id: String, start_sequence: NatsOffset, mut config: jetstream::consumer::pull::Config, @@ -666,6 +667,7 @@ impl NatsCommon { NatsOffset::Earliest => DeliverPolicy::All, NatsOffset::Latest => DeliverPolicy::New, NatsOffset::SequenceNumber(v) => { + // for compatibility, we do not write to any state table now let parsed = v .parse::() .context("failed to parse nats offset as sequence number")?; @@ -680,12 +682,19 @@ impl NatsCommon { NatsOffset::None => DeliverPolicy::All, }; - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = deliver_policy; - config - }) - .await?; + let consumer = if let Ok(consumer) = stream.get_consumer(&name).await { + consumer + } else { + stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config.durable_name = Some(durable_consumer_name); + config.filter_subjects = + self.subject.split(',').map(|s| s.to_string()).collect(); + config + }) + .await? + }; Ok(consumer) } @@ -695,8 +704,17 @@ impl NatsCommon { stream: String, ) -> ConnectorResult { let subjects: Vec = self.subject.split(',').map(|s| s.to_string()).collect(); + if let Ok(mut stream_instance) = jetstream.get_stream(&stream).await { + tracing::info!( + "load existing nats stream ({:?}) with config {:?}", + stream, + stream_instance.info().await? + ); + return Ok(stream_instance); + } + let mut config = jetstream::stream::Config { - name: stream, + name: stream.clone(), max_bytes: 1000000, subjects, ..Default::default() @@ -716,6 +734,11 @@ impl NatsCommon { if let Some(v) = self.max_message_size { config.max_message_size = v; } + tracing::info!( + "create nats stream ({:?}) with config {:?}", + &stream, + config + ); let stream = jetstream.get_or_create_stream(config).await?; Ok(stream) } diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index c42af8fcfa024..09377b95f35c1 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -62,6 +62,7 @@ def_anyhow_newtype! { async_nats::jetstream::consumer::pull::MessagesError => "Nats error", async_nats::jetstream::context::CreateStreamError => "Nats error", async_nats::jetstream::stream::ConsumerError => "Nats error", + async_nats::error::Error => "Nats error", NatsJetStreamError => "Nats error", icelake::Error => "Iceberg error", diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 9a3a9e052a7f9..b8a3b7948aee2 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -100,6 +100,9 @@ pub struct NatsProperties { #[serde(rename = "stream")] pub stream: String, + #[serde(rename = "durable_consumer_name")] + pub durable_consumer_name: String, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -339,6 +342,7 @@ mod test { "consumer.num_replicas".to_string() => "3".to_string(), "consumer.memory_storage".to_string() => "true".to_string(), "consumer.backoff.sec".to_string() => "2,10,15".to_string(), + "durable_consumer_name".to_string() => "test_durable_consumer".to_string(), }; diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 0b3036d4a56c0..5bcae105e07be 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -88,6 +88,7 @@ impl SplitReader for NatsSplitReader { .common .build_consumer( properties.stream.clone(), + properties.durable_consumer_name.clone(), split_id.to_string(), start_position.clone(), config, diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 76284566f3d2c..9b65e7429461b 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -51,13 +51,8 @@ impl SplitMetaData for NatsSplit { serde_json::to_value(self.clone()).unwrap().into() } - fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { - let start_sequence = if last_seen_offset.is_empty() { - NatsOffset::Earliest - } else { - NatsOffset::SequenceNumber(last_seen_offset) - }; - self.start_sequence = start_sequence; + fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { + // we do not require to update the offset for nats, let durable consumer handle it Ok(()) } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 91eaf59b1c7b6..20d3fc1714233 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -643,6 +643,9 @@ NatsProperties: - name: stream field_type: String required: true + - name: durable_consumer_name + field_type: String + required: true NexmarkProperties: fields: - name: nexmark.split.num