From d6f8ca22d9884d2a8367f4eaecda3171b4933c3a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 27 Sep 2024 12:35:33 +0800 Subject: [PATCH] fix(nats): align offset and property fields between NATS and others for better ux and dx (#18732) Signed-off-by: Richard Chien --- src/connector/src/connector_common/common.rs | 2 +- src/connector/src/source/kinesis/mod.rs | 4 ++-- src/connector/src/source/kinesis/source/reader.rs | 8 ++++---- src/connector/src/source/nats/mod.rs | 4 +++- src/connector/src/source/nats/source/reader.rs | 13 +++++-------- src/connector/src/source/nats/split.rs | 2 +- src/connector/with_options_source.yaml | 2 +- 7 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index a492c2f7d2dcc..59575b35d51ea 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -674,7 +674,7 @@ impl NatsCommon { } } NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { - start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000) + start_time: OffsetDateTime::from_unix_timestamp_nanos(v as i128 * 1_000_000) .context("invalid timestamp for nats offset")?, }, NatsOffset::None => DeliverPolicy::All, diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index de393a5121961..1e64b06716e56 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -40,7 +40,7 @@ pub struct KinesisProperties { #[serde(rename = "scan.startup.timestamp.millis")] #[serde_as(as = "Option")] - pub timestamp_offset: Option, + pub start_timestamp_millis: Option, #[serde(flatten)] pub common: KinesisCommon, @@ -80,6 +80,6 @@ mod test { let kinesis_props: KinesisProperties = serde_json::from_value(serde_json::to_value(props).unwrap()).unwrap(); - assert_eq!(kinesis_props.timestamp_offset, Some(123456789)); + assert_eq!(kinesis_props.start_timestamp_millis, Some(123456789)); } } diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index a1f8db4578054..78a382968adac 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -74,14 +74,14 @@ impl SplitReader for KinesisSplitReader { "earliest" => KinesisOffset::Earliest, "latest" => KinesisOffset::Latest, "timestamp" => { - if let Some(ts) = &properties.timestamp_offset { + if let Some(ts) = &properties.start_timestamp_millis { KinesisOffset::Timestamp(*ts) } else { bail!("scan.startup.timestamp.millis is required"); } } _ => { - bail!("invalid scan_startup_mode, accept earliest/latest/timestamp") + bail!("invalid scan.startup.mode, accept earliest/latest/timestamp") } }, }, @@ -89,7 +89,7 @@ impl SplitReader for KinesisSplitReader { }; if !matches!(next_offset, KinesisOffset::Timestamp(_)) - && properties.timestamp_offset.is_some() + && properties.start_timestamp_millis.is_some() { // cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp' tracing::warn!("scan.startup.mode needs to be set to 'timestamp' if you want to start with a specific timestamp, starting shard {} from the beginning", @@ -356,7 +356,7 @@ mod tests { }, scan_startup_mode: None, - timestamp_offset: None, + start_timestamp_millis: None, unknown_fields: Default::default(), }; diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index e012dedfac55a..baf18be703146 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -60,6 +60,7 @@ impl ReplayPolicyWrapper { } } +#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct NatsProperties { #[serde(flatten)] @@ -75,7 +76,8 @@ pub struct NatsProperties { rename = "scan.startup.timestamp.millis", alias = "scan.startup.timestamp_millis" )] - pub start_time: Option, + #[serde_as(as = "Option")] + pub start_timestamp_millis: Option, #[serde(rename = "stream")] pub stream: String, diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 45d13017e0ada..937962effa4be 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Context as _; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; @@ -62,17 +61,15 @@ impl SplitReader for NatsSplitReader { Some(mode) => match mode.as_str() { "latest" => NatsOffset::Latest, "earliest" => NatsOffset::Earliest, - "timestamp_millis" => { - if let Some(time) = &properties.start_time { - NatsOffset::Timestamp(time.parse().context( - "failed to parse the start time as nats offset timestamp", - )?) + "timestamp" | "timestamp_millis" /* backward-compat */ => { + if let Some(ts) = &properties.start_timestamp_millis { + NatsOffset::Timestamp(*ts) } else { - bail!("scan_startup_timestamp_millis is required"); + bail!("scan.startup.timestamp.millis is required"); } } _ => { - bail!("invalid scan_startup_mode, accept earliest/latest/timestamp_millis") + bail!("invalid scan.startup.mode, accept earliest/latest/timestamp") } }, }, diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 7a07c481f9bc3..76284566f3d2c 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -23,7 +23,7 @@ pub enum NatsOffset { Earliest, Latest, SequenceNumber(String), - Timestamp(i128), + Timestamp(i64), None, } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 50222280e8b1b..91eaf59b1c7b6 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -636,7 +636,7 @@ NatsProperties: field_type: String required: false - name: scan.startup.timestamp.millis - field_type: String + field_type: i64 required: false alias: - scan.startup.timestamp_millis