diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index 6ad250fc9301..993e28379d6f 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -29,13 +29,11 @@ pub const KINESIS_CONNECTOR: &str = "kinesis"; #[derive(Clone, Debug, Deserialize)] pub struct KinesisProperties { #[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")] - // accepted values: "latest", "earliest", "sequence_number" + // accepted values: "latest", "earliest", "timestamp" pub scan_startup_mode: Option, - #[serde( - rename = "scan.startup.sequence_number", - alias = "kinesis.scan.startup.sequence_number" - )] - pub seq_offset: Option, + + #[serde(rename = "scan.startup.timestamp.millis")] + pub timestamp_offset: Option, #[serde(flatten)] pub common: KinesisCommon, diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index fed48c5162a4..c321a4c63fd7 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -18,6 +18,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError}; use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput}; +use aws_sdk_kinesis::primitives::DateTime; use aws_sdk_kinesis::types::ShardIteratorType; use aws_sdk_kinesis::Client as KinesisClient; use futures_async_stream::try_stream; @@ -69,16 +70,16 @@ impl SplitReader for KinesisSplitReader { Some(mode) => match mode.as_str() { "earliest" => KinesisOffset::Earliest, "latest" => KinesisOffset::Latest, - "sequence_number" => { - if let Some(seq) = &properties.seq_offset { - KinesisOffset::SequenceNumber(seq.clone()) + "timestamp" => { + if let Some(ts) = &properties.timestamp_offset { + KinesisOffset::Timestamp(*ts) } else { - return Err(anyhow!("scan_startup_sequence_number is required")); + return Err(anyhow!("scan.startup.timestamp.millis is required")); } } _ => { return Err(anyhow!( - "invalid scan_startup_mode, accept earliest/latest/sequence_number" + "invalid scan_startup_mode, accept earliest/latest/timestamp" )) } }, @@ -86,11 +87,11 @@ impl SplitReader for KinesisSplitReader { start_position => start_position.to_owned(), }; - if !matches!(start_position, KinesisOffset::SequenceNumber(_)) - && properties.seq_offset.is_some() + if !matches!(start_position, KinesisOffset::Timestamp(_)) + && properties.timestamp_offset.is_some() { return Err( - anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") + anyhow!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp") ); } @@ -208,27 +209,37 @@ impl CommonSplitReader for KinesisSplitReader { } impl KinesisSplitReader { async fn new_shard_iter(&mut self) -> Result<()> { - let (starting_seq_num, iter_type) = if self.latest_offset.is_some() { + let (starting_seq_num, start_timestamp, iter_type) = if self.latest_offset.is_some() { ( self.latest_offset.clone(), + None, ShardIteratorType::AfterSequenceNumber, ) } else { match &self.start_position { - KinesisOffset::Earliest => (None, ShardIteratorType::TrimHorizon), - KinesisOffset::SequenceNumber(seq) => { - (Some(seq.clone()), ShardIteratorType::AfterSequenceNumber) - } - KinesisOffset::Latest => (None, ShardIteratorType::Latest), + KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon), + KinesisOffset::SequenceNumber(seq) => ( + Some(seq.clone()), + None, + ShardIteratorType::AfterSequenceNumber, + ), + KinesisOffset::Latest => (None, None, ShardIteratorType::Latest), + KinesisOffset::Timestamp(ts) => ( + None, + Some(DateTime::from_millis(*ts)), + ShardIteratorType::AtTimestamp, + ), _ => unreachable!(), } }; + // `starting_seq_num` and `starting_timestamp` will not be both set async fn get_shard_iter_inner( client: &KinesisClient, stream_name: &str, shard_id: &str, starting_seq_num: Option, + starting_timestamp: Option, iter_type: ShardIteratorType, ) -> Result { let resp = client @@ -237,6 +248,7 @@ impl KinesisSplitReader { .shard_id(shard_id) .shard_iterator_type(iter_type) .set_starting_sequence_number(starting_seq_num) + .set_timestamp(starting_timestamp) .send() .await?; @@ -256,6 +268,7 @@ impl KinesisSplitReader { &self.stream_name, &self.shard_id, starting_seq_num.clone(), + start_timestamp, iter_type.clone(), ) }, @@ -307,10 +320,7 @@ mod tests { }, scan_startup_mode: None, - seq_offset: Some( - // redundant seq number - "49629139817504901062972448413535783695568426186596941842".to_string(), - ), + timestamp_offset: Some(123456789098765432), }; let client = KinesisSplitReader::new( properties, @@ -343,7 +353,7 @@ mod tests { }, scan_startup_mode: None, - seq_offset: None, + timestamp_offset: None, }; let trim_horizen_reader = KinesisSplitReader::new(