Skip to content

Commit

Permalink
fix: early terminate when reach the end of inactive shard (#17957)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Aug 8, 2024
1 parent 006a8db commit a4d890e
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl KinesisSplitReader {
#[try_stream(ok = Vec < SourceMessage >, error = crate::error::ConnectorError)]
async fn into_data_stream(mut self) {
self.new_shard_iter().await?;
let mut finish_flag = false;
loop {
if self.shard_iter.is_none() {
tracing::warn!(
Expand All @@ -135,6 +136,18 @@ impl KinesisSplitReader {
}
match self.get_records().await {
Ok(resp) => {
if resp.millis_behind_latest.is_none()
&& let Some(shard) = &resp.child_shards
&& !shard.is_empty()
{
// according to the doc https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/operation/get_records/struct.GetRecordsOutput.html
//
// > The list of the current shard's child shards, returned in the GetRecords API's response only when the end of the current shard is reached.
//
// It means the current shard is finished, ie. inactive, and we should stop reading it. Checking `millis_behind_latest` is a double check.
// Other executors are going to read the child shards.
finish_flag = true;
}
self.shard_iter = resp.next_shard_iterator().map(String::from);
let chunk = (resp.records().iter())
.map(|r| from_kinesis_record(r, self.split_id.clone()))
Expand All @@ -150,6 +163,13 @@ impl KinesisSplitReader {
self.latest_offset
);
yield chunk;
if finish_flag {
tracing::info!(
"shard {:?} reaches the end and is inactive, stop reading",
self.shard_id
);
break;
}
}
Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => {
tracing::warn!("shard {:?} is closed, stop reading", self.shard_id);
Expand Down

0 comments on commit a4d890e

Please sign in to comment.