Skip to content

Commit

Permalink
refactor: impl ack and migrate to durable consumer for Nats (#18873)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Oct 16, 2024
1 parent 6863a24 commit d997ebe
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 23 deletions.
37 changes: 30 additions & 7 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ impl NatsCommon {
pub(crate) async fn build_consumer(
&self,
stream: String,
durable_consumer_name: String,
split_id: String,
start_sequence: NatsOffset,
mut config: jetstream::consumer::pull::Config,
Expand All @@ -666,6 +667,7 @@ impl NatsCommon {
NatsOffset::Earliest => DeliverPolicy::All,
NatsOffset::Latest => DeliverPolicy::New,
NatsOffset::SequenceNumber(v) => {
// for compatibility, we do not write to any state table now
let parsed = v
.parse::<u64>()
.context("failed to parse nats offset as sequence number")?;
Expand All @@ -680,12 +682,19 @@ impl NatsCommon {
NatsOffset::None => DeliverPolicy::All,
};

let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = deliver_policy;
config
})
.await?;
let consumer = if let Ok(consumer) = stream.get_consumer(&name).await {
consumer
} else {
stream
.get_or_create_consumer(&name, {
config.deliver_policy = deliver_policy;
config.durable_name = Some(durable_consumer_name);
config.filter_subjects =
self.subject.split(',').map(|s| s.to_string()).collect();
config
})
.await?
};
Ok(consumer)
}

Expand All @@ -695,8 +704,17 @@ impl NatsCommon {
stream: String,
) -> ConnectorResult<jetstream::stream::Stream> {
let subjects: Vec<String> = self.subject.split(',').map(|s| s.to_string()).collect();
if let Ok(mut stream_instance) = jetstream.get_stream(&stream).await {
tracing::info!(
"load existing nats stream ({:?}) with config {:?}",
stream,
stream_instance.info().await?
);
return Ok(stream_instance);
}

let mut config = jetstream::stream::Config {
name: stream,
name: stream.clone(),
max_bytes: 1000000,
subjects,
..Default::default()
Expand All @@ -716,6 +734,11 @@ impl NatsCommon {
if let Some(v) = self.max_message_size {
config.max_message_size = v;
}
tracing::info!(
"create nats stream ({:?}) with config {:?}",
&stream,
config
);
let stream = jetstream.get_or_create_stream(config).await?;
Ok(stream)
}
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::parser::AccessError;
use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError};
use crate::schema::InvalidOptionError;
use crate::sink::SinkError;
use crate::source::nats::NatsJetStreamError;

def_anyhow_newtype! {
pub ConnectorError,
Expand Down Expand Up @@ -61,6 +62,9 @@ def_anyhow_newtype! {
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
async_nats::jetstream::stream::ConsumerError => "Nats error",
async_nats::error::Error<async_nats::jetstream::context::RequestErrorKind> => "Nats error",
NatsJetStreamError => "Nats error",

icelake::Error => "Iceberg error",
iceberg::Error => "IcebergV2 error",
redis::RedisError => "Redis error",
Expand Down
48 changes: 48 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod nats;
pub mod nexmark;
pub mod pulsar;

use std::future::IntoFuture;

pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
use google_cloud_pubsub::subscription::Subscription;
Expand All @@ -40,6 +42,8 @@ mod manager;
pub mod reader;
pub mod test_source;

use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
use async_nats::jetstream::context::Context as JetStreamContext;
pub use manager::{SourceColumnDesc, SourceColumnType};
use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool
pub enum WaitCheckpointTask {
CommitCdcOffset(Option<(SplitId, String)>),
AckPubsubMessage(Subscription, Vec<ArrayRef>),
AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
}

impl WaitCheckpointTask {
Expand Down Expand Up @@ -123,6 +128,49 @@ impl WaitCheckpointTask {
}
ack(&subscription, ack_ids).await;
}
WaitCheckpointTask::AckNatsJetStream(
ref context,
reply_subjects_arrs,
ref ack_policy,
) => {
async fn ack(context: &JetStreamContext, reply_subject: String) {
match context.publish(reply_subject.clone(), "".into()).await {
Err(e) => {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
Ok(ack_future) => {
if let Err(e) = ack_future.into_future().await {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
}
}
}

let reply_subjects = reply_subjects_arrs
.iter()
.flat_map(|arr| {
arr.as_utf8()
.iter()
.flatten()
.map(|s| s.to_string())
.collect::<Vec<String>>()
})
.collect::<Vec<String>>();

match ack_policy {
JetStreamAckPolicy::None => (),
JetStreamAckPolicy::Explicit => {
for reply_subject in reply_subjects {
ack(context, reply_subject).await;
}
}
JetStreamAckPolicy::All => {
if let Some(reply_subject) = reply_subjects.last() {
ack(context, reply_subject.clone()).await;
}
}
}
}
}
}
}
37 changes: 33 additions & 4 deletions src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,63 @@ pub mod source;
pub mod split;

use std::collections::HashMap;
use std::fmt::Display;
use std::time::Duration;

use async_nats::jetstream::consumer::pull::Config;
use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy};
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use with_options::WithOptions;

use crate::connector_common::NatsCommon;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::nats::enumerator::NatsSplitEnumerator;
use crate::source::nats::source::{NatsSplit, NatsSplitReader};
use crate::source::SourceProperties;
use crate::{
deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string,
};

#[derive(Debug, Clone, Error)]
pub struct NatsJetStreamError(String);

impl Display for NatsJetStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

pub const NATS_CONNECTOR: &str = "nats";

pub struct AckPolicyWrapper;

impl AckPolicyWrapper {
pub fn parse_str(s: &str) -> Result<AckPolicy, String> {
pub fn parse_str(s: &str) -> Result<AckPolicy, NatsJetStreamError> {
match s {
"none" => Ok(AckPolicy::None),
"all" => Ok(AckPolicy::All),
"explicit" => Ok(AckPolicy::Explicit),
_ => Err(format!("Invalid AckPolicy '{}'", s)),
_ => Err(NatsJetStreamError(format!(
"Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`",
s
))),
}
}
}

pub struct ReplayPolicyWrapper;

impl ReplayPolicyWrapper {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, String> {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, NatsJetStreamError> {
match s {
"instant" => Ok(ReplayPolicy::Instant),
"original" => Ok(ReplayPolicy::Original),
_ => Err(format!("Invalid ReplayPolicy '{}'", s)),
_ => Err(NatsJetStreamError(format!(
"Invalid ReplayPolicy '{}', expect `instant` and `original`",
s
))),
}
}
}
Expand All @@ -82,6 +100,9 @@ pub struct NatsProperties {
#[serde(rename = "stream")]
pub stream: String,

#[serde(rename = "durable_consumer_name")]
pub durable_consumer_name: String,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down Expand Up @@ -257,6 +278,13 @@ impl NatsPropertiesConsumer {
c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect()
}
}

pub fn get_ack_policy(&self) -> ConnectorResult<AckPolicy> {
match &self.ack_policy {
Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?),
None => Ok(AckPolicy::None),
}
}
}

impl SourceProperties for NatsProperties {
Expand Down Expand Up @@ -314,6 +342,7 @@ mod test {
"consumer.num_replicas".to_string() => "3".to_string(),
"consumer.memory_storage".to_string() => "true".to_string(),
"consumer.backoff.sec".to_string() => "2,10,15".to_string(),
"durable_consumer_name".to_string() => "test_durable_consumer".to_string(),

};

Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/source/nats/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct NatsMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Vec<u8>,
pub reply_subject: Option<String>,
}

impl From<NatsMessage> for SourceMessage {
Expand All @@ -30,7 +31,10 @@ impl From<NatsMessage> for SourceMessage {
key: None,
payload: Some(message.payload),
// For nats jetstream, use sequence id as offset
offset: message.sequence_number,
//
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover
// use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run
offset: message.reply_subject.unwrap_or_default(),
split_id: message.split_id,
meta: SourceMeta::Empty,
}
Expand All @@ -43,6 +47,10 @@ impl NatsMessage {
split_id,
sequence_number: message.info().unwrap().stream_sequence.to_string(),
payload: message.message.payload.to_vec(),
reply_subject: message
.message
.reply
.map(|subject| subject.as_str().to_string()),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/nats/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod message;
mod reader;

pub use message::*;
pub use reader::*;

pub use crate::source::nats::split::*;
8 changes: 6 additions & 2 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl SplitReader for NatsSplitReader {
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
// TODO: to simplify the logic, return 1 split for first version
assert!(splits.len() == 1);
// We guarantee the split num always align with parallelism
assert_eq!(splits.len(), 1);
let split = splits.into_iter().next().unwrap();
let split_id = split.split_id;
let start_position = match &split.start_sequence {
Expand All @@ -73,6 +73,9 @@ impl SplitReader for NatsSplitReader {
}
},
},
// We have record on this Nats Split, contains the last seen offset (seq id) or reply subject
// We do not use the seq id as start position anymore,
// but just let the reader load from durable consumer on broker.
start_position => start_position.to_owned(),
};

Expand All @@ -85,6 +88,7 @@ impl SplitReader for NatsSplitReader {
.common
.build_consumer(
properties.stream.clone(),
properties.durable_consumer_name.clone(),
split_id.to_string(),
start_position.clone(),
config,
Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/source/nats/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,8 @@ impl SplitMetaData for NatsSplit {
serde_json::to_value(self.clone()).unwrap().into()
}

fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
let start_sequence = if last_seen_offset.is_empty() {
NatsOffset::Earliest
} else {
NatsOffset::SequenceNumber(last_seen_offset)
};
self.start_sequence = start_sequence;
fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
// we do not require to update the offset for nats, let durable consumer handle it
Ok(())
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use async_nats::jetstream::consumer::AckPolicy;
use futures::future::try_join_all;
use futures::stream::pending;
use futures::StreamExt;
Expand Down Expand Up @@ -127,6 +128,18 @@ impl SourceReader {
prop.subscription_client().await?,
vec![],
)),
ConnectorProperties::Nats(prop) => {
match prop.nats_properties_consumer.get_ack_policy()? {
a @ AckPolicy::Explicit | a @ AckPolicy::All => {
Some(WaitCheckpointTask::AckNatsJetStream(
prop.common.build_context().await?,
vec![],
a,
))
}
AckPolicy::None => None,
}
}
_ => None,
})
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ NatsProperties:
- name: stream
field_type: String
required: true
- name: durable_consumer_name
field_type: String
required: true
NexmarkProperties:
fields:
- name: nexmark.split.num
Expand Down
Loading

0 comments on commit d997ebe

Please sign in to comment.