Skip to content

Commit

Permalink
feat: allow kinesis source start with timestamp (#12241)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored and Li0k committed Sep 15, 2023
1 parent 0e801b2 commit 78da15d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
10 changes: 4 additions & 6 deletions src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ pub const KINESIS_CONNECTOR: &str = "kinesis";
#[derive(Clone, Debug, Deserialize)]
pub struct KinesisProperties {
#[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")]
// accepted values: "latest", "earliest", "sequence_number"
// accepted values: "latest", "earliest", "timestamp"
pub scan_startup_mode: Option<String>,
#[serde(
rename = "scan.startup.sequence_number",
alias = "kinesis.scan.startup.sequence_number"
)]
pub seq_offset: Option<String>,

#[serde(rename = "scan.startup.timestamp.millis")]
pub timestamp_offset: Option<i64>,

#[serde(flatten)]
pub common: KinesisCommon,
Expand Down
48 changes: 29 additions & 19 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError};
use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput};
use aws_sdk_kinesis::primitives::DateTime;
use aws_sdk_kinesis::types::ShardIteratorType;
use aws_sdk_kinesis::Client as KinesisClient;
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -69,28 +70,28 @@ impl SplitReader for KinesisSplitReader {
Some(mode) => match mode.as_str() {
"earliest" => KinesisOffset::Earliest,
"latest" => KinesisOffset::Latest,
"sequence_number" => {
if let Some(seq) = &properties.seq_offset {
KinesisOffset::SequenceNumber(seq.clone())
"timestamp" => {
if let Some(ts) = &properties.timestamp_offset {
KinesisOffset::Timestamp(*ts)
} else {
return Err(anyhow!("scan_startup_sequence_number is required"));
return Err(anyhow!("scan.startup.timestamp.millis is required"));
}
}
_ => {
return Err(anyhow!(
"invalid scan_startup_mode, accept earliest/latest/sequence_number"
"invalid scan_startup_mode, accept earliest/latest/timestamp"
))
}
},
},
start_position => start_position.to_owned(),
};

if !matches!(start_position, KinesisOffset::SequenceNumber(_))
&& properties.seq_offset.is_some()
if !matches!(start_position, KinesisOffset::Timestamp(_))
&& properties.timestamp_offset.is_some()
{
return Err(
anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number")
anyhow!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp")
);
}

Expand Down Expand Up @@ -208,27 +209,37 @@ impl CommonSplitReader for KinesisSplitReader {
}
impl KinesisSplitReader {
async fn new_shard_iter(&mut self) -> Result<()> {
let (starting_seq_num, iter_type) = if self.latest_offset.is_some() {
let (starting_seq_num, start_timestamp, iter_type) = if self.latest_offset.is_some() {
(
self.latest_offset.clone(),
None,
ShardIteratorType::AfterSequenceNumber,
)
} else {
match &self.start_position {
KinesisOffset::Earliest => (None, ShardIteratorType::TrimHorizon),
KinesisOffset::SequenceNumber(seq) => {
(Some(seq.clone()), ShardIteratorType::AfterSequenceNumber)
}
KinesisOffset::Latest => (None, ShardIteratorType::Latest),
KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon),
KinesisOffset::SequenceNumber(seq) => (
Some(seq.clone()),
None,
ShardIteratorType::AfterSequenceNumber,
),
KinesisOffset::Latest => (None, None, ShardIteratorType::Latest),
KinesisOffset::Timestamp(ts) => (
None,
Some(DateTime::from_millis(*ts)),
ShardIteratorType::AtTimestamp,
),
_ => unreachable!(),
}
};

// `starting_seq_num` and `starting_timestamp` will not be both set
async fn get_shard_iter_inner(
client: &KinesisClient,
stream_name: &str,
shard_id: &str,
starting_seq_num: Option<String>,
starting_timestamp: Option<DateTime>,
iter_type: ShardIteratorType,
) -> Result<String> {
let resp = client
Expand All @@ -237,6 +248,7 @@ impl KinesisSplitReader {
.shard_id(shard_id)
.shard_iterator_type(iter_type)
.set_starting_sequence_number(starting_seq_num)
.set_timestamp(starting_timestamp)
.send()
.await?;

Expand All @@ -256,6 +268,7 @@ impl KinesisSplitReader {
&self.stream_name,
&self.shard_id,
starting_seq_num.clone(),
start_timestamp,
iter_type.clone(),
)
},
Expand Down Expand Up @@ -307,10 +320,7 @@ mod tests {
},

scan_startup_mode: None,
seq_offset: Some(
// redundant seq number
"49629139817504901062972448413535783695568426186596941842".to_string(),
),
timestamp_offset: Some(123456789098765432),
};
let client = KinesisSplitReader::new(
properties,
Expand Down Expand Up @@ -343,7 +353,7 @@ mod tests {
},

scan_startup_mode: None,
seq_offset: None,
timestamp_offset: None,
};

let trim_horizen_reader = KinesisSplitReader::new(
Expand Down

0 comments on commit 78da15d

Please sign in to comment.