Skip to content

Commit

Permalink
fix: correctly handle scan.startup.timestamp.millis as Option<i64> (
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Nov 27, 2023
1 parent 56424ad commit 45d2df5
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod source;
pub mod split;

use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;

use crate::common::KinesisCommon;
Expand All @@ -27,13 +28,15 @@ use crate::source::SourceProperties;

pub const KINESIS_CONNECTOR: &str = "kinesis";

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct KinesisProperties {
#[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")]
// accepted values: "latest", "earliest", "timestamp"
pub scan_startup_mode: Option<String>,

#[serde(rename = "scan.startup.timestamp.millis")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub timestamp_offset: Option<i64>,

#[serde(flatten)]
Expand All @@ -47,3 +50,26 @@ impl SourceProperties for KinesisProperties {

const SOURCE_NAME: &'static str = KINESIS_CONNECTOR;
}

#[cfg(test)]
mod test {
use std::collections::HashMap;

use maplit::hashmap;

use super::*;

#[test]
fn test_parse_kinesis_timestamp_offset() {
let props: HashMap<String, String> = hashmap! {
"stream".to_string() => "sample_stream".to_string(),
"aws.region".to_string() => "us-east-1".to_string(),
"scan_startup_mode".to_string() => "timestamp".to_string(),
"scan.startup.timestamp.millis".to_string() => "123456789".to_string(),
};

let kinesis_props: KinesisProperties =
serde_json::from_value(serde_json::to_value(props).unwrap()).unwrap();
assert_eq!(kinesis_props.timestamp_offset, Some(123456789));
}
}

0 comments on commit 45d2df5

Please sign in to comment.