Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: impl ack and migrate to durable consumer for Nats #18873

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::parser::AccessError;
use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError};
use crate::schema::InvalidOptionError;
use crate::sink::SinkError;
use crate::source::nats::NatsJetStreamError;

def_anyhow_newtype! {
pub ConnectorError,
Expand Down Expand Up @@ -61,6 +62,8 @@ def_anyhow_newtype! {
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
async_nats::jetstream::stream::ConsumerError => "Nats error",
NatsJetStreamError => "Nats error",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these 5 errors are "Nats error". Consider including more context?


icelake::Error => "Iceberg error",
iceberg::Error => "IcebergV2 error",
redis::RedisError => "Redis error",
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::kinesis::KinesisMeta;
use super::monitor::SourceMetrics;
use super::nats::NatsJetStreamMeta;
use super::nexmark::source::message::NexmarkMeta;
use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::error::ConnectorResult as Result;
Expand Down Expand Up @@ -630,6 +631,7 @@ pub enum SourceMeta {
GooglePubsub(GooglePubsubMeta),
Datagen(DatagenMeta),
DebeziumCdc(DebeziumCdcMeta),
NatsJetStream(NatsJetStreamMeta),
// For the source that doesn't have meta data.
Empty,
}
Expand Down
48 changes: 48 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod nats;
pub mod nexmark;
pub mod pulsar;

use std::future::IntoFuture;

pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
use google_cloud_pubsub::subscription::Subscription;
Expand All @@ -40,6 +42,8 @@ mod manager;
pub mod reader;
pub mod test_source;

use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
use async_nats::jetstream::context::Context as JetStreamContext;
pub use manager::{SourceColumnDesc, SourceColumnType};
use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool
pub enum WaitCheckpointTask {
CommitCdcOffset(Option<(SplitId, String)>),
AckPubsubMessage(Subscription, Vec<ArrayRef>),
AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
}

impl WaitCheckpointTask {
Expand Down Expand Up @@ -123,6 +128,49 @@ impl WaitCheckpointTask {
}
ack(&subscription, ack_ids).await;
}
WaitCheckpointTask::AckNatsJetStream(
ref context,
reply_subjects_arrs,
ref ack_policy,
) => {
async fn ack(context: &JetStreamContext, reply_subject: String) {
match context.publish(reply_subject.clone(), "".into()).await {
Err(e) => {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
Ok(ack_future) => {
if let Err(e) = ack_future.into_future().await {
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
}
}
}
}

let reply_subjects = reply_subjects_arrs
.iter()
.flat_map(|arr| {
arr.as_utf8()
.iter()
.flatten()
.map(|s| s.to_string())
.collect::<Vec<String>>()
})
.collect::<Vec<String>>();

match ack_policy {
JetStreamAckPolicy::None => (),
JetStreamAckPolicy::Explicit => {
for reply_subject in reply_subjects {
ack(context, reply_subject).await;
}
}
JetStreamAckPolicy::All => {
if let Some(reply_subject) = reply_subjects.last() {
ack(context, reply_subject.clone()).await;
}
}
Comment on lines +160 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0" already contain the offset information, right?

Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0" already contain the offset information, right?

yes

Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?

If ack_policy is ack_all (batch ack), at ckpt_1, we just ack msg2, which ack both msg1 and msg2. Msg3 and msg4 are not ack-ed.

}
}
}
}
}
31 changes: 27 additions & 4 deletions src/connector/src/source/nats/mod.rs
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,55 @@ use async_nats::jetstream::consumer::pull::Config;
use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy};
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::Display;
use thiserror::Error;
use with_options::WithOptions;

use crate::connector_common::NatsCommon;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::nats::enumerator::NatsSplitEnumerator;
pub use crate::source::nats::source::NatsJetStreamMeta;
use crate::source::nats::source::{NatsSplit, NatsSplitReader};
use crate::source::SourceProperties;
use crate::{
deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string,
};

#[derive(Debug, Clone, Error, Display)]
pub enum NatsJetStreamError {
InvalidAckPolicy(String),
InvalidReplayPolicy(String),
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

pub const NATS_CONNECTOR: &str = "nats";

pub struct AckPolicyWrapper;

impl AckPolicyWrapper {
pub fn parse_str(s: &str) -> Result<AckPolicy, String> {
pub fn parse_str(s: &str) -> Result<AckPolicy, NatsJetStreamError> {
match s {
"none" => Ok(AckPolicy::None),
"all" => Ok(AckPolicy::All),
"explicit" => Ok(AckPolicy::Explicit),
_ => Err(format!("Invalid AckPolicy '{}'", s)),
_ => Err(NatsJetStreamError::InvalidAckPolicy(format!(
"Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`",
s
))),
}
}
}

pub struct ReplayPolicyWrapper;

impl ReplayPolicyWrapper {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, String> {
pub fn parse_str(s: &str) -> Result<ReplayPolicy, NatsJetStreamError> {
match s {
"instant" => Ok(ReplayPolicy::Instant),
"original" => Ok(ReplayPolicy::Original),
_ => Err(format!("Invalid ReplayPolicy '{}'", s)),
_ => Err(NatsJetStreamError::InvalidReplayPolicy(format!(
"Invalid ReplayPolicy '{}', expect `instant` and `original`",
s
))),
}
}
}
Expand Down Expand Up @@ -257,6 +273,13 @@ impl NatsPropertiesConsumer {
c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect()
}
}

pub fn get_ack_policy(&self) -> ConnectorResult<AckPolicy> {
match &self.ack_policy {
Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?),
None => Ok(AckPolicy::None),
}
}
}

impl SourceProperties for NatsProperties {
Expand Down
15 changes: 14 additions & 1 deletion src/connector/src/source/nats/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ pub struct NatsMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Vec<u8>,
pub reply_subject: Option<String>,
}

#[derive(Clone, Debug)]
pub struct NatsJetStreamMeta {
pub reply_subject: Option<String>,
}

impl From<NatsMessage> for SourceMessage {
Expand All @@ -30,9 +36,12 @@ impl From<NatsMessage> for SourceMessage {
key: None,
payload: Some(message.payload),
// For nats jetstream, use sequence id as offset
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

offset: message.sequence_number,
split_id: message.split_id,
meta: SourceMeta::Empty,
meta: SourceMeta::NatsJetStream(NatsJetStreamMeta {
reply_subject: message.reply_subject,
}),
}
}
}
Expand All @@ -43,6 +52,10 @@ impl NatsMessage {
split_id,
sequence_number: message.info().unwrap().stream_sequence.to_string(),
payload: message.message.payload.to_vec(),
reply_subject: message
.message
.reply
.map(|subject| subject.as_str().to_string()),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/nats/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod message;
mod reader;

pub use message::*;
pub use reader::*;

pub use crate::source::nats::split::*;
5 changes: 5 additions & 0 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ impl SourceReader {
prop.subscription_client().await?,
vec![],
)),
ConnectorProperties::Nats(prop) => Some(WaitCheckpointTask::AckNatsJetStream(
prop.common.build_context().await?,
vec![],
prop.nats_properties_consumer.get_ack_policy()?,
)),
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
_ => None,
})
}
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,14 @@ struct WaitCheckpointTaskBuilder {

impl WaitCheckpointTaskBuilder {
fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
#[expect(clippy::single_match)]
match &mut self.building_task {
WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
arrays.push(offset_col);
}
_ => {}
WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
arrays.push(offset_col);
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
WaitCheckpointTask::CommitCdcOffset(_) => {}
}
}

Expand Down
Loading