Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: impl ack and migrate to durable consumer for Nats #18873

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::parser::AccessError;
use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError};
use crate::schema::InvalidOptionError;
use crate::sink::SinkError;
use crate::source::nats::NatsJetStreamError;

def_anyhow_newtype! {
pub ConnectorError,
Expand Down Expand Up @@ -61,6 +62,8 @@ def_anyhow_newtype! {
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
async_nats::jetstream::stream::ConsumerError => "Nats error",
NatsJetStreamError => "Nats error",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these 5 errors are "Nats error". Consider including more context?


icelake::Error => "Iceberg error",
iceberg::Error => "IcebergV2 error",
redis::RedisError => "Redis error",
Expand Down
48 changes: 48 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod nats;
pub mod nexmark;
pub mod pulsar;

use std::future::IntoFuture;

pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
use google_cloud_pubsub::subscription::Subscription;
Expand All @@ -40,6 +42,8 @@ mod manager;
pub mod reader;
pub mod test_source;

use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
use async_nats::jetstream::context::Context as JetStreamContext;
pub use manager::{SourceColumnDesc, SourceColumnType};
use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool
pub enum WaitCheckpointTask {
CommitCdcOffset(Option<(SplitId, String)>),
AckPubsubMessage(Subscription, Vec<ArrayRef>),
AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
}

impl WaitCheckpointTask {
Expand Down Expand Up @@ -123,6 +128,49 @@ impl WaitCheckpointTask {
}
ack(&subscription, ack_ids).await;
}
WaitCheckpointTask::AckNatsJetStream(
ref context,
reply_subjects_arrs,
ref ack_policy,
) => {
async fn ack(context: &JetStreamContext, reply_subject: String) {
match context.publish(reply_subject.clone(), "".into()).await {
Err(e) => {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
Ok(ack_future) => {
if let Err(e) = ack_future.into_future().await {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
}
}
}

let reply_subjects = reply_subjects_arrs
.iter()
.flat_map(|arr| {
arr.as_utf8()
.iter()
.flatten()
.map(|s| s.to_string())
.collect::<Vec<String>>()
})
.collect::<Vec<String>>();

match ack_policy {
JetStreamAckPolicy::None => (),
JetStreamAckPolicy::Explicit => {
for reply_subject in reply_subjects {
ack(context, reply_subject).await;
}
}
JetStreamAckPolicy::All => {
if let Some(reply_subject) = reply_subjects.last() {
ack(context, reply_subject.clone()).await;
}
}
Comment on lines +160 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0" already contain the offset information, right?

Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0" already contain the offset information, right?

yes

Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?

If ack_policy is ack_all (batch ack), at ckpt_1, we just ack msg2, which ack both msg1 and msg2. Msg3 and msg4 are not ack-ed.

}
}
}
}
}
33 changes: 29 additions & 4 deletions src/connector/src/source/nats/mod.rs
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,63 @@ pub mod source;
pub mod split;

use std::collections::HashMap;
use std::fmt::Display;
use std::time::Duration;

use async_nats::jetstream::consumer::pull::Config;
use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy};
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use with_options::WithOptions;

use crate::connector_common::NatsCommon;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::nats::enumerator::NatsSplitEnumerator;
use crate::source::nats::source::{NatsSplit, NatsSplitReader};
use crate::source::SourceProperties;
use crate::{
deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string,
};

#[derive(Debug, Clone, Error)]
pub struct NatsJetStreamError(String);

impl Display for NatsJetStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

pub const NATS_CONNECTOR: &str = "nats";

pub struct AckPolicyWrapper;

impl AckPolicyWrapper {
pub fn parse_str(s: &str) -> Result<AckPolicy, String> {
pub fn parse_str(s: &str) -> Result<AckPolicy, NatsJetStreamError> {
match s {
"none" => Ok(AckPolicy::None),
"all" => Ok(AckPolicy::All),
"explicit" => Ok(AckPolicy::Explicit),
_ => Err(format!("Invalid AckPolicy '{}'", s)),
_ => Err(NatsJetStreamError(format!(
"Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`",
s
))),
}
}
}

pub struct ReplayPolicyWrapper;

impl ReplayPolicyWrapper {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, String> {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, NatsJetStreamError> {
match s {
"instant" => Ok(ReplayPolicy::Instant),
"original" => Ok(ReplayPolicy::Original),
_ => Err(format!("Invalid ReplayPolicy '{}'", s)),
_ => Err(NatsJetStreamError(format!(
"Invalid ReplayPolicy '{}', expect `instant` and `original`",
s
))),
}
}
}
Expand Down Expand Up @@ -257,6 +275,13 @@ impl NatsPropertiesConsumer {
c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect()
}
}

pub fn get_ack_policy(&self) -> ConnectorResult<AckPolicy> {
match &self.ack_policy {
Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?),
None => Ok(AckPolicy::None),
}
}
}

impl SourceProperties for NatsProperties {
Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/source/nats/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct NatsMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Vec<u8>,
pub reply_subject: Option<String>,
}

impl From<NatsMessage> for SourceMessage {
Expand All @@ -30,7 +31,10 @@ impl From<NatsMessage> for SourceMessage {
key: None,
payload: Some(message.payload),
// For nats jetstream, use sequence id as offset
offset: message.sequence_number,
//
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run
offset: message.reply_subject.unwrap_or_default(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yufansong Unfortunately, offset column should be kept as additional column and I think it is not worth making an exception for Nats. We use offset here to store the reply subject.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. SGTM

split_id: message.split_id,
meta: SourceMeta::Empty,
}
Expand All @@ -43,6 +47,10 @@ impl NatsMessage {
split_id,
sequence_number: message.info().unwrap().stream_sequence.to_string(),
payload: message.message.payload.to_vec(),
reply_subject: message
.message
.reply
.map(|subject| subject.as_str().to_string()),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/nats/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod message;
mod reader;

pub use message::*;
pub use reader::*;

pub use crate::source::nats::split::*;
7 changes: 5 additions & 2 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl SplitReader for NatsSplitReader {
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
// TODO: to simplify the logic, return 1 split for first version
assert!(splits.len() == 1);
// We guarantee the split num always align with parallelism
assert_eq!(splits.len(), 1);
let split = splits.into_iter().next().unwrap();
let split_id = split.split_id;
let start_position = match &split.start_sequence {
Expand All @@ -73,6 +73,9 @@ impl SplitReader for NatsSplitReader {
}
},
},
// We have record on this Nats Split, contains the last seen offset (seq id) or reply subject
// We do not use the seq id as start position anymore,
// but just let the reader load from durable consumer on broker.
Comment on lines +76 to +78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand these comments, but you didn't make any change to the implementation code (L58 ~ L72). Why & how it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use api get_or_create_consumer when building stream consumer. api ref
If we utilize an existing durable consumer, the provided config should be the same as the consumer created. So the config here should never change, always align to the one conducted from with clause.

following changes in #18895

start_position => start_position.to_owned(),
};

Expand Down
13 changes: 13 additions & 0 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use async_nats::jetstream::consumer::AckPolicy;
use futures::future::try_join_all;
use futures::stream::pending;
use futures::StreamExt;
Expand Down Expand Up @@ -127,6 +128,18 @@ impl SourceReader {
prop.subscription_client().await?,
vec![],
)),
ConnectorProperties::Nats(prop) => {
match prop.nats_properties_consumer.get_ack_policy()? {
a @ AckPolicy::Explicit | a @ AckPolicy::All => {
Some(WaitCheckpointTask::AckNatsJetStream(
prop.common.build_context().await?,
vec![],
a,
))
}
AckPolicy::None => None,
}
}
_ => None,
})
}
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,14 @@ struct WaitCheckpointTaskBuilder {

impl WaitCheckpointTaskBuilder {
fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
#[expect(clippy::single_match)]
match &mut self.building_task {
WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
arrays.push(offset_col);
}
_ => {}
WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
arrays.push(offset_col);
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
WaitCheckpointTask::CommitCdcOffset(_) => {}
}
}

Expand Down
Loading