Skip to content

Commit

Permalink
fix(nats): align offset and property fields between NATS and others f…
Browse files Browse the repository at this point in the history
…or better ux and dx (#18732)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Sep 27, 2024
1 parent 4548a99 commit d6f8ca2
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ impl NatsCommon {
}
}
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)
start_time: OffsetDateTime::from_unix_timestamp_nanos(v as i128 * 1_000_000)
.context("invalid timestamp for nats offset")?,
},
NatsOffset::None => DeliverPolicy::All,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct KinesisProperties {

#[serde(rename = "scan.startup.timestamp.millis")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub timestamp_offset: Option<i64>,
pub start_timestamp_millis: Option<i64>,

#[serde(flatten)]
pub common: KinesisCommon,
Expand Down Expand Up @@ -80,6 +80,6 @@ mod test {

let kinesis_props: KinesisProperties =
serde_json::from_value(serde_json::to_value(props).unwrap()).unwrap();
assert_eq!(kinesis_props.timestamp_offset, Some(123456789));
assert_eq!(kinesis_props.start_timestamp_millis, Some(123456789));
}
}
8 changes: 4 additions & 4 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ impl SplitReader for KinesisSplitReader {
"earliest" => KinesisOffset::Earliest,
"latest" => KinesisOffset::Latest,
"timestamp" => {
if let Some(ts) = &properties.timestamp_offset {
if let Some(ts) = &properties.start_timestamp_millis {
KinesisOffset::Timestamp(*ts)
} else {
bail!("scan.startup.timestamp.millis is required");
}
}
_ => {
bail!("invalid scan_startup_mode, accept earliest/latest/timestamp")
bail!("invalid scan.startup.mode, accept earliest/latest/timestamp")
}
},
},
next_offset => next_offset.to_owned(),
};

if !matches!(next_offset, KinesisOffset::Timestamp(_))
&& properties.timestamp_offset.is_some()
&& properties.start_timestamp_millis.is_some()
{
// cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp'
tracing::warn!("scan.startup.mode needs to be set to 'timestamp' if you want to start with a specific timestamp, starting shard {} from the beginning",
Expand Down Expand Up @@ -356,7 +356,7 @@ mod tests {
},

scan_startup_mode: None,
timestamp_offset: None,
start_timestamp_millis: None,

unknown_fields: Default::default(),
};
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl ReplayPolicyWrapper {
}
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct NatsProperties {
#[serde(flatten)]
Expand All @@ -75,7 +76,8 @@ pub struct NatsProperties {
rename = "scan.startup.timestamp.millis",
alias = "scan.startup.timestamp_millis"
)]
pub start_time: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub start_timestamp_millis: Option<i64>,

#[serde(rename = "stream")]
pub stream: String,
Expand Down
13 changes: 5 additions & 8 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context as _;
use async_nats::jetstream::consumer;
use async_trait::async_trait;
use futures::StreamExt;
Expand Down Expand Up @@ -62,17 +61,15 @@ impl SplitReader for NatsSplitReader {
Some(mode) => match mode.as_str() {
"latest" => NatsOffset::Latest,
"earliest" => NatsOffset::Earliest,
"timestamp_millis" => {
if let Some(time) = &properties.start_time {
NatsOffset::Timestamp(time.parse().context(
"failed to parse the start time as nats offset timestamp",
)?)
"timestamp" | "timestamp_millis" /* backward-compat */ => {
if let Some(ts) = &properties.start_timestamp_millis {
NatsOffset::Timestamp(*ts)
} else {
bail!("scan_startup_timestamp_millis is required");
bail!("scan.startup.timestamp.millis is required");
}
}
_ => {
bail!("invalid scan_startup_mode, accept earliest/latest/timestamp_millis")
bail!("invalid scan.startup.mode, accept earliest/latest/timestamp")
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/nats/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum NatsOffset {
Earliest,
Latest,
SequenceNumber(String),
Timestamp(i128),
Timestamp(i64),
None,
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ NatsProperties:
field_type: String
required: false
- name: scan.startup.timestamp.millis
field_type: String
field_type: i64
required: false
alias:
- scan.startup.timestamp_millis
Expand Down

0 comments on commit d6f8ca2

Please sign in to comment.