-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: impl ack and migrate to durable consumer for Nats #18873
Changes from all commits
3812714
a332ff7
b70f296
a6ffcd6
c3d8176
02f28b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,8 @@ pub mod nats; | |
pub mod nexmark; | ||
pub mod pulsar; | ||
|
||
use std::future::IntoFuture; | ||
|
||
pub use base::{UPSTREAM_SOURCE_KEY, *}; | ||
pub(crate) use common::*; | ||
use google_cloud_pubsub::subscription::Subscription; | ||
|
@@ -40,6 +42,8 @@ mod manager; | |
pub mod reader; | ||
pub mod test_source; | ||
|
||
use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy; | ||
use async_nats::jetstream::context::Context as JetStreamContext; | ||
pub use manager::{SourceColumnDesc, SourceColumnType}; | ||
use risingwave_common::array::{Array, ArrayRef}; | ||
use thiserror_ext::AsReport; | ||
|
@@ -77,6 +81,7 @@ pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool | |
pub enum WaitCheckpointTask { | ||
CommitCdcOffset(Option<(SplitId, String)>), | ||
AckPubsubMessage(Subscription, Vec<ArrayRef>), | ||
AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy), | ||
} | ||
|
||
impl WaitCheckpointTask { | ||
|
@@ -123,6 +128,49 @@ impl WaitCheckpointTask { | |
} | ||
ack(&subscription, ack_ids).await; | ||
} | ||
WaitCheckpointTask::AckNatsJetStream( | ||
ref context, | ||
reply_subjects_arrs, | ||
ref ack_policy, | ||
) => { | ||
async fn ack(context: &JetStreamContext, reply_subject: String) { | ||
match context.publish(reply_subject.clone(), "".into()).await { | ||
Err(e) => { | ||
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); | ||
} | ||
Ok(ack_future) => { | ||
if let Err(e) = ack_future.into_future().await { | ||
tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message"); | ||
} | ||
} | ||
} | ||
} | ||
|
||
let reply_subjects = reply_subjects_arrs | ||
.iter() | ||
.flat_map(|arr| { | ||
arr.as_utf8() | ||
.iter() | ||
.flatten() | ||
.map(|s| s.to_string()) | ||
.collect::<Vec<String>>() | ||
}) | ||
.collect::<Vec<String>>(); | ||
|
||
match ack_policy { | ||
JetStreamAckPolicy::None => (), | ||
JetStreamAckPolicy::Explicit => { | ||
for reply_subject in reply_subjects { | ||
ack(context, reply_subject).await; | ||
} | ||
} | ||
JetStreamAckPolicy::All => { | ||
if let Some(reply_subject) = reply_subjects.last() { | ||
ack(context, reply_subject.clone()).await; | ||
} | ||
} | ||
Comment on lines
+160
to
+171
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, the subjects like Want to make the logic correct in this situation: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes
If ack_policy is ack_all (batch ack), at ckpt_1, we just ack msg2, which ack both msg1 and msg2. Msg3 and msg4 are not ack-ed. |
||
} | ||
} | ||
} | ||
} | ||
} |
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ pub struct NatsMessage { | |
pub split_id: SplitId, | ||
pub sequence_number: String, | ||
pub payload: Vec<u8>, | ||
pub reply_subject: Option<String>, | ||
} | ||
|
||
impl From<NatsMessage> for SourceMessage { | ||
|
@@ -30,7 +31,10 @@ impl From<NatsMessage> for SourceMessage { | |
key: None, | ||
payload: Some(message.payload), | ||
// For nats jetstream, use sequence id as offset | ||
offset: message.sequence_number, | ||
// | ||
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
// use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run | ||
offset: message.reply_subject.unwrap_or_default(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yufansong Unfortunately, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get it. SGTM |
||
split_id: message.split_id, | ||
meta: SourceMeta::Empty, | ||
} | ||
|
@@ -43,6 +47,10 @@ impl NatsMessage { | |
split_id, | ||
sequence_number: message.info().unwrap().stream_sequence.to_string(), | ||
payload: message.message.payload.to_vec(), | ||
reply_subject: message | ||
.message | ||
.reply | ||
.map(|subject| subject.as_str().to_string()), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
mod message; | ||
mod reader; | ||
|
||
pub use message::*; | ||
pub use reader::*; | ||
|
||
pub use crate::source::nats::split::*; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,8 +51,8 @@ impl SplitReader for NatsSplitReader { | |
source_ctx: SourceContextRef, | ||
_columns: Option<Vec<Column>>, | ||
) -> Result<Self> { | ||
// TODO: to simplify the logic, return 1 split for first version | ||
assert!(splits.len() == 1); | ||
// We guarantee the split num always align with parallelism | ||
assert_eq!(splits.len(), 1); | ||
let split = splits.into_iter().next().unwrap(); | ||
let split_id = split.split_id; | ||
let start_position = match &split.start_sequence { | ||
|
@@ -73,6 +73,9 @@ impl SplitReader for NatsSplitReader { | |
} | ||
}, | ||
}, | ||
// We have record on this Nats Split, contains the last seen offset (seq id) or reply subject | ||
// We do not use the seq id as start position anymore, | ||
// but just let the reader load from durable consumer on broker. | ||
Comment on lines
+76
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand these comments, but you didn't make any change to the implementation code (L58 ~ L72). Why & how it works? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use api following changes in #18895 |
||
start_position => start_position.to_owned(), | ||
}; | ||
|
||
|
@@ -85,6 +88,7 @@ impl SplitReader for NatsSplitReader { | |
.common | ||
.build_consumer( | ||
properties.stream.clone(), | ||
properties.durable_consumer_name.clone(), | ||
split_id.to_string(), | ||
start_position.clone(), | ||
config, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these 5 errors are
"Nats error"
. Consider including more context?