From 73659b62b1c61b78023f2f6c5f7f7cf13939746f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 12 Sep 2023 23:48:32 +0800 Subject: [PATCH 1/2] fix --- src/connector/src/source/kinesis/mod.rs | 5 ++- .../src/source/kinesis/source/reader.rs | 36 +++++++++++++++---- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index fc786f8f1b10d..a3faa36943810 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -25,7 +25,7 @@ pub const KINESIS_CONNECTOR: &str = "kinesis"; #[derive(Clone, Debug, Deserialize)] pub struct KinesisProperties { #[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")] - // accepted values: "latest", "earliest", "sequence_number" + // accepted values: "latest", "earliest", "sequence_number", "timestamp" pub scan_startup_mode: Option, #[serde( rename = "scan.startup.sequence_number", @@ -33,6 +33,9 @@ pub struct KinesisProperties { )] pub seq_offset: Option, + #[serde(rename = "scan.startup.timestamp.millis")] + pub timestamp_offset: Option, + #[serde(flatten)] pub common: KinesisCommon, } diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index f4aad6ad80587..a0dedcf1d81ee 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -18,6 +18,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError}; use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput}; +use aws_sdk_kinesis::primitives::DateTime; use aws_sdk_kinesis::types::ShardIteratorType; use aws_sdk_kinesis::Client as KinesisClient; use futures_async_stream::try_stream; @@ -76,9 +77,16 @@ impl SplitReader for KinesisSplitReader { return Err(anyhow!("scan_startup_sequence_number is required")); } } + "timestamp" => { + if let Some(ts) = &properties.timestamp_offset { + KinesisOffset::Timestamp(*ts) + } else { + return Err(anyhow!("scan.startup.timestamp.millis is required")); + } + } _ => { return Err(anyhow!( - "invalid scan_startup_mode, accept earliest/latest/sequence_number" + "invalid scan_startup_mode, accept earliest/latest/sequence_number/timestamp" )) } }, @@ -208,27 +216,37 @@ impl CommonSplitReader for KinesisSplitReader { } impl KinesisSplitReader { async fn new_shard_iter(&mut self) -> Result<()> { - let (starting_seq_num, iter_type) = if self.latest_offset.is_some() { + let (starting_seq_num, start_timestamp, iter_type) = if self.latest_offset.is_some() { ( self.latest_offset.clone(), + None, ShardIteratorType::AfterSequenceNumber, ) } else { match &self.start_position { - KinesisOffset::Earliest => (None, ShardIteratorType::TrimHorizon), - KinesisOffset::SequenceNumber(seq) => { - (Some(seq.clone()), ShardIteratorType::AfterSequenceNumber) - } - KinesisOffset::Latest => (None, ShardIteratorType::Latest), + KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon), + KinesisOffset::SequenceNumber(seq) => ( + Some(seq.clone()), + None, + ShardIteratorType::AfterSequenceNumber, + ), + KinesisOffset::Latest => (None, None, ShardIteratorType::Latest), + KinesisOffset::Timestamp(ts) => ( + None, + Some(DateTime::from_millis(*ts)), + ShardIteratorType::AtTimestamp, + ), _ => unreachable!(), } }; + // `starting_seq_num` and `starting_timestamp` will not be both set async fn get_shard_iter_inner( client: &KinesisClient, stream_name: &str, shard_id: &str, starting_seq_num: Option, + starting_timestamp: Option, iter_type: ShardIteratorType, ) -> Result { let resp = client @@ -237,6 +255,7 @@ impl KinesisSplitReader { .shard_id(shard_id) .shard_iterator_type(iter_type) .set_starting_sequence_number(starting_seq_num) + .set_timestamp(starting_timestamp) .send() .await?; @@ -256,6 +275,7 @@ impl KinesisSplitReader { &self.stream_name, &self.shard_id, starting_seq_num.clone(), + start_timestamp, iter_type.clone(), ) }, @@ -311,6 +331,7 @@ mod tests { // redundant seq number "49629139817504901062972448413535783695568426186596941842".to_string(), ), + timestamp_offset: None, }; let client = KinesisSplitReader::new( properties, @@ -344,6 +365,7 @@ mod tests { scan_startup_mode: None, seq_offset: None, + timestamp_offset: None, }; let trim_horizen_reader = KinesisSplitReader::new( From 4d538dc3f47333cba1194869659a150a2ec2d609 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 13 Sep 2023 15:19:27 +0800 Subject: [PATCH 2/2] feat: Refactor Kinesis source properties for better offset management - Modify `start_position` variable to use `KinesisOffset::Timestamp` - Remove `seq_offset` field from `KinesisProperties` - Add `timestamp_offset` field with value `123456789098765432` to `KinesisProperties` - Remove `test_reject_redundant_seq_props` test function - Remove `seq_offset` field from `KinesisProperties` struct in `test_single_thread_kinesis_reader` test function - Modify `scan_startup_mode` field in `KinesisProperties` struct - Change accepted values for `scan_startup_mode` to include "timestamp" - Add `timestamp_offset` field to `KinesisProperties` struct Signed-off-by: tabVersion --- src/connector/src/source/kinesis/mod.rs | 7 +----- .../src/source/kinesis/source/reader.rs | 22 +++++-------------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index 6cf91644cf7a9..993e28379d6ff 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -29,13 +29,8 @@ pub const KINESIS_CONNECTOR: &str = "kinesis"; #[derive(Clone, Debug, Deserialize)] pub struct KinesisProperties { #[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")] - // accepted values: "latest", "earliest", "sequence_number", "timestamp" + // accepted values: "latest", "earliest", "timestamp" pub scan_startup_mode: Option, - #[serde( - rename = "scan.startup.sequence_number", - alias = "kinesis.scan.startup.sequence_number" - )] - pub seq_offset: Option, #[serde(rename = "scan.startup.timestamp.millis")] pub timestamp_offset: Option, diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index f8b027dcc1ba0..c321a4c63fd7d 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -70,13 +70,6 @@ impl SplitReader for KinesisSplitReader { Some(mode) => match mode.as_str() { "earliest" => KinesisOffset::Earliest, "latest" => KinesisOffset::Latest, - "sequence_number" => { - if let Some(seq) = &properties.seq_offset { - KinesisOffset::SequenceNumber(seq.clone()) - } else { - return Err(anyhow!("scan_startup_sequence_number is required")); - } - } "timestamp" => { if let Some(ts) = &properties.timestamp_offset { KinesisOffset::Timestamp(*ts) @@ -86,7 +79,7 @@ impl SplitReader for KinesisSplitReader { } _ => { return Err(anyhow!( - "invalid scan_startup_mode, accept earliest/latest/sequence_number/timestamp" + "invalid scan_startup_mode, accept earliest/latest/timestamp" )) } }, @@ -94,11 +87,11 @@ impl SplitReader for KinesisSplitReader { start_position => start_position.to_owned(), }; - if !matches!(start_position, KinesisOffset::SequenceNumber(_)) - && properties.seq_offset.is_some() + if !matches!(start_position, KinesisOffset::Timestamp(_)) + && properties.timestamp_offset.is_some() { return Err( - anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") + anyhow!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp") ); } @@ -327,11 +320,7 @@ mod tests { }, scan_startup_mode: None, - seq_offset: Some( - // redundant seq number - "49629139817504901062972448413535783695568426186596941842".to_string(), - ), - timestamp_offset: None, + timestamp_offset: Some(123456789098765432), }; let client = KinesisSplitReader::new( properties, @@ -364,7 +353,6 @@ mod tests { }, scan_startup_mode: None, - seq_offset: None, timestamp_offset: None, };