Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow kinesis source start with timestamp #12241

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/connector/src/source/kinesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ pub const KINESIS_CONNECTOR: &str = "kinesis";
#[derive(Clone, Debug, Deserialize)]
pub struct KinesisProperties {
#[serde(rename = "scan.startup.mode", alias = "kinesis.scan.startup.mode")]
// accepted values: "latest", "earliest", "sequence_number"
// accepted values: "latest", "earliest", "timestamp"
pub scan_startup_mode: Option<String>,
#[serde(
rename = "scan.startup.sequence_number",
alias = "kinesis.scan.startup.sequence_number"
)]
pub seq_offset: Option<String>,

#[serde(rename = "scan.startup.timestamp.millis")]
pub timestamp_offset: Option<i64>,

#[serde(flatten)]
pub common: KinesisCommon,
Expand Down
48 changes: 29 additions & 19 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError};
use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput};
use aws_sdk_kinesis::primitives::DateTime;
use aws_sdk_kinesis::types::ShardIteratorType;
use aws_sdk_kinesis::Client as KinesisClient;
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -69,28 +70,28 @@ impl SplitReader for KinesisSplitReader {
Some(mode) => match mode.as_str() {
"earliest" => KinesisOffset::Earliest,
"latest" => KinesisOffset::Latest,
"sequence_number" => {
if let Some(seq) = &properties.seq_offset {
KinesisOffset::SequenceNumber(seq.clone())
"timestamp" => {
if let Some(ts) = &properties.timestamp_offset {
KinesisOffset::Timestamp(*ts)
} else {
return Err(anyhow!("scan_startup_sequence_number is required"));
return Err(anyhow!("scan.startup.timestamp.millis is required"));
}
}
_ => {
return Err(anyhow!(
"invalid scan_startup_mode, accept earliest/latest/sequence_number"
"invalid scan_startup_mode, accept earliest/latest/timestamp"
))
}
},
},
start_position => start_position.to_owned(),
};

if !matches!(start_position, KinesisOffset::SequenceNumber(_))
&& properties.seq_offset.is_some()
if !matches!(start_position, KinesisOffset::Timestamp(_))
&& properties.timestamp_offset.is_some()
{
return Err(
anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number")
anyhow!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp")
);
}

Expand Down Expand Up @@ -208,27 +209,37 @@ impl CommonSplitReader for KinesisSplitReader {
}
impl KinesisSplitReader {
async fn new_shard_iter(&mut self) -> Result<()> {
let (starting_seq_num, iter_type) = if self.latest_offset.is_some() {
let (starting_seq_num, start_timestamp, iter_type) = if self.latest_offset.is_some() {
(
self.latest_offset.clone(),
None,
ShardIteratorType::AfterSequenceNumber,
)
} else {
match &self.start_position {
KinesisOffset::Earliest => (None, ShardIteratorType::TrimHorizon),
KinesisOffset::SequenceNumber(seq) => {
(Some(seq.clone()), ShardIteratorType::AfterSequenceNumber)
}
KinesisOffset::Latest => (None, ShardIteratorType::Latest),
KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon),
KinesisOffset::SequenceNumber(seq) => (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it would be better if we could give an warning to users, something like
"The sequence is only unique within a certain shard, so the semantics they expect may deviate from the reality, etc."

We give a warning instead of completely disabling it because we still want to keep backward compatibility

Copy link
Member

@fuyufjh fuyufjh Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a completely wrong design. Recommend removing it. (Yeah I know it's breaking change...)

Each data record has a sequence number that is unique per partition-key within its shard

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the warning here cannot propagate to the frontend. Let's make some breaking changes here.

Copy link
Member

@fuyufjh fuyufjh Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the warning here cannot propagate to the frontend. Let's make some breaking changes here.

If possible, I suggest we can leave the option here to avoid panic (perhaps print a warning log), but ignore the option, removing all related implementation code.

Copy link
Contributor

@curiosityyy curiosityyy Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no sequence_number option, unless the shards are fixed and pass sequence_numbers for each shard to rw. Otherwise, this feature doesn't make sense.

Some(seq.clone()),
None,
ShardIteratorType::AfterSequenceNumber,
),
KinesisOffset::Latest => (None, None, ShardIteratorType::Latest),
KinesisOffset::Timestamp(ts) => (
None,
Some(DateTime::from_millis(*ts)),
ShardIteratorType::AtTimestamp,
),
_ => unreachable!(),
}
};

// `starting_seq_num` and `starting_timestamp` will not be both set
async fn get_shard_iter_inner(
client: &KinesisClient,
stream_name: &str,
shard_id: &str,
starting_seq_num: Option<String>,
starting_timestamp: Option<DateTime>,
iter_type: ShardIteratorType,
) -> Result<String> {
let resp = client
Expand All @@ -237,6 +248,7 @@ impl KinesisSplitReader {
.shard_id(shard_id)
.shard_iterator_type(iter_type)
.set_starting_sequence_number(starting_seq_num)
.set_timestamp(starting_timestamp)
.send()
.await?;

Expand All @@ -256,6 +268,7 @@ impl KinesisSplitReader {
&self.stream_name,
&self.shard_id,
starting_seq_num.clone(),
start_timestamp,
iter_type.clone(),
)
},
Expand Down Expand Up @@ -307,10 +320,7 @@ mod tests {
},

scan_startup_mode: None,
seq_offset: Some(
// redundant seq number
"49629139817504901062972448413535783695568426186596941842".to_string(),
),
timestamp_offset: Some(123456789098765432),
};
let client = KinesisSplitReader::new(
properties,
Expand Down Expand Up @@ -343,7 +353,7 @@ mod tests {
},

scan_startup_mode: None,
seq_offset: None,
timestamp_offset: None,
};

let trim_horizen_reader = KinesisSplitReader::new(
Expand Down