Skip to content

Commit

Permalink
feat(stream): support set nats consumer deliver policy as latest, ear…
Browse files Browse the repository at this point in the history
…liest, by timestamp (#12176)
  • Loading branch information
yufansong authored Sep 14, 2023
1 parent 8cc5120 commit ddbf1f3
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand All @@ -110,6 +111,7 @@ tonic = { workspace = true }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
44 changes: 25 additions & 19 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error;
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};
use time::OffsetDateTime;

use crate::aws_auth::AwsAuthProps;
use crate::deserialize_duration_from_string;
use crate::sink::SinkError;

use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

Expand Down Expand Up @@ -425,8 +426,8 @@ impl NatsCommon {

pub(crate) async fn build_consumer(
&self,
split_id: i32,
start_sequence: Option<u64>,
split_id: String,
start_sequence: NatsOffset,
) -> anyhow::Result<
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
> {
Expand All @@ -437,23 +438,28 @@ impl NatsCommon {
ack_policy: jetstream::consumer::AckPolicy::None,
..Default::default()
};
match start_sequence {
Some(v) => {
let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = DeliverPolicy::ByStartSequence {
start_sequence: v + 1,
};
config
})
.await?;
Ok(consumer)
}
None => {
let consumer = stream.get_or_create_consumer(&name, config).await?;
Ok(consumer)

let deliver_policy = match start_sequence {
NatsOffset::Earliest => DeliverPolicy::All,
NatsOffset::Latest => DeliverPolicy::Last,
NatsOffset::SequenceNumber(v) => {
let parsed = v.parse::<u64>()?;
DeliverPolicy::ByStartSequence {
start_sequence: 1 + parsed,
}
}
}
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?,
},
NatsOffset::None => DeliverPolicy::All,
};
let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = deliver_policy;
config
})
.await?;
Ok(consumer)
}

pub(crate) async fn build_or_get_stream(
Expand Down
14 changes: 8 additions & 6 deletions src/connector/src/source/nats/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use anyhow;
use async_trait::async_trait;

use super::source::NatsSplit;
use super::source::{NatsOffset, NatsSplit};
use super::NatsProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct NatsSplitEnumerator {
subject: String,
split_num: i32,
split_id: SplitId,
}

#[async_trait]
Expand All @@ -36,16 +38,16 @@ impl SplitEnumerator for NatsSplitEnumerator {
) -> anyhow::Result<NatsSplitEnumerator> {
Ok(Self {
subject: properties.common.subject,
split_num: 0,
split_id: Arc::from("0"),
})
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<NatsSplit>> {
// TODO: to simplify the logic, return 1 split for first version
let nats_split = NatsSplit {
subject: self.subject.clone(),
split_num: 0, // be the same as `from_nats_jetstream_message`
start_sequence: None,
split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message`
start_sequence: NatsOffset::None,
};

Ok(vec![nats_split])
Expand Down
9 changes: 9 additions & 0 deletions src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ pub const NATS_CONNECTOR: &str = "nats";
pub struct NatsProperties {
#[serde(flatten)]
pub common: NatsCommon,

#[serde(rename = "scan.startup.mode", alias = "nats.scan.startup.mode")]
pub scan_startup_mode: Option<String>,

#[serde(
rename = "scan.startup.timestamp_millis",
alias = "nats.scan.startup.timestamp_millis"
)]
pub start_time: Option<String>,
}

impl NatsProperties {}
30 changes: 24 additions & 6 deletions src/connector/src/source/nats/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,37 @@
// limitations under the License.

use async_nats;
use async_nats::jetstream::Message;

use crate::source::base::SourceMessage;
use crate::source::SourceMeta;
use crate::source::{SourceMeta, SplitId};

impl SourceMessage {
pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self {
#[derive(Clone, Debug)]
pub struct NatsMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Vec<u8>,
}

impl From<NatsMessage> for SourceMessage {
fn from(message: NatsMessage) -> Self {
SourceMessage {
key: None,
payload: Some(message.message.payload.to_vec()),
payload: Some(message.payload),
// For nats jetstream, use sequence id as offset
offset: message.info().unwrap().stream_sequence.to_string(),
split_id: "0".into(),
offset: message.sequence_number,
split_id: message.split_id,
meta: SourceMeta::Empty,
}
}
}

impl NatsMessage {
pub fn new(split_id: SplitId, message: Message) -> Self {
NatsMessage {
split_id,
sequence_number: message.info().unwrap().stream_sequence.to_string(),
payload: message.message.payload.to_vec(),
}
}
}
48 changes: 39 additions & 9 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_nats::jetstream::consumer;
use async_trait::async_trait;
use futures::StreamExt;
use futures_async_stream::try_stream;

use super::message::NatsMessage;
use super::NatsOffset;
use crate::parser::ParserConfig;
use crate::source::common::{into_chunk_stream, CommonSplitReader};
use crate::source::nats::split::NatsSplit;
use crate::source::nats::NatsProperties;
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader,
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl,
SplitReader,
};

pub struct NatsSplitReader {
consumer: consumer::Consumer<consumer::pull::Config>,
properties: NatsProperties,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
start_position: NatsOffset,
split_id: SplitId,
}

#[async_trait]
Expand All @@ -46,19 +50,42 @@ impl SplitReader for NatsSplitReader {
) -> Result<Self> {
// TODO: to simplify the logic, return 1 split for first version
assert!(splits.len() == 1);
let splits = splits
.into_iter()
.map(|split| split.into_nats().unwrap())
.collect::<Vec<NatsSplit>>();
let split = splits.into_iter().next().unwrap().into_nats().unwrap();
let split_id = split.split_id;
let start_position = match &split.start_sequence {
NatsOffset::None => match &properties.scan_startup_mode {
None => NatsOffset::Earliest,
Some(mode) => match mode.as_str() {
"latest" => NatsOffset::Latest,
"earliest" => NatsOffset::Earliest,
"timestamp_millis" => {
if let Some(time) = &properties.start_time {
NatsOffset::Timestamp(time.parse()?)
} else {
return Err(anyhow!("scan_startup_timestamp_millis is required"));
}
}
_ => {
return Err(anyhow!(
"invalid scan_startup_mode, accept earliest/latest/timestamp_millis"
))
}
},
},
start_position => start_position.to_owned(),
};

let consumer = properties
.common
.build_consumer(0, splits[0].start_sequence)
.build_consumer(split_id.to_string(), start_position.clone())
.await?;
Ok(Self {
consumer,
properties,
parser_config,
source_ctx,
start_position,
split_id,
})
}

Expand All @@ -78,7 +105,10 @@ impl CommonSplitReader for NatsSplitReader {
for msgs in messages.ready_chunks(capacity) {
let mut msg_vec = Vec::with_capacity(capacity);
for msg in msgs {
msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?));
msg_vec.push(SourceMessage::from(NatsMessage::new(
self.split_id.clone(),
msg?,
)));
}
yield msg_vec;
}
Expand Down
26 changes: 20 additions & 6 deletions src/connector/src/source/nats/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@ use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum NatsOffset {
Earliest,
Latest,
SequenceNumber(String),
Timestamp(i128),
None,
}

/// The states of a NATS split, which will be persisted to checkpoint.
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
pub struct NatsSplit {
pub(crate) subject: String,
// TODO: to simplify the logic, return 1 split for first version. May use parallelism in
// future.
pub(crate) split_num: i32,
pub(crate) start_sequence: Option<u64>,
pub(crate) split_id: SplitId,
pub(crate) start_sequence: NatsOffset,
}

impl SplitMetaData for NatsSplit {
fn id(&self) -> SplitId {
// TODO: should avoid constructing a string every time
format!("{}", self.split_num).into()
format!("{}", self.split_id).into()
}

fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
Expand All @@ -44,16 +53,21 @@ impl SplitMetaData for NatsSplit {
}

impl NatsSplit {
pub fn new(subject: String, split_num: i32, start_sequence: Option<u64>) -> Self {
pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self {
Self {
subject,
split_num,
split_id,
start_sequence,
}
}

pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> {
self.start_sequence = Some(start_sequence.as_str().parse::<u64>().unwrap());
let start_sequence = if start_sequence.is_empty() {
NatsOffset::Earliest
} else {
NatsOffset::SequenceNumber(start_sequence)
};
self.start_sequence = start_sequence;
Ok(())
}
}

0 comments on commit ddbf1f3

Please sign in to comment.