Skip to content

Commit

Permalink
fix kinesis property
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Nov 27, 2023
1 parent 97eb91c commit bcbe788
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(if_let_guard)]
#![feature(iterator_try_collect)]
#![feature(try_blocks)]
#![feature(let_else)]

use std::time::Duration;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct KinesisProperties {
pub scan_startup_mode: Option<String>,

#[serde(rename = "scan.startup.timestamp.millis")]
pub timestamp_offset: Option<i64>,
pub timestamp_offset: Option<String>,

#[serde(flatten)]
pub common: KinesisCommon,
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ impl SplitReader for KinesisSplitReader {
"latest" => KinesisOffset::Latest,
"timestamp" => {
if let Some(ts) = &properties.timestamp_offset {
KinesisOffset::Timestamp(*ts)
let Ok(ts_i) = ts.parse() else {
return Err(anyhow!(
format! {"cannot parse timestamp offset {ts} into i64"}
));
};
KinesisOffset::Timestamp(ts_i)
} else {
return Err(anyhow!("scan.startup.timestamp.millis is required"));
}
Expand Down

0 comments on commit bcbe788

Please sign in to comment.