Skip to content

Commit

Permalink
fix: fix kinesis early exit (#18183)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Aug 22, 2024
1 parent 9c4984d commit 71732b1
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl KinesisSplitReader {
#[try_stream(ok = Vec < SourceMessage >, error = crate::error::ConnectorError)]
async fn into_data_stream(mut self) {
self.new_shard_iter().await?;
let mut finish_flag = false;
loop {
if self.shard_iter.is_none() {
tracing::warn!(
Expand All @@ -139,22 +138,39 @@ impl KinesisSplitReader {
}
match self.get_records().await {
Ok(resp) => {
if resp.millis_behind_latest.is_none()
&& let Some(shard) = &resp.child_shards
tracing::trace!(?self.shard_id, ?resp);
self.shard_iter = resp.next_shard_iterator().map(String::from);
let chunk = (resp.records().iter())
.map(|r| from_kinesis_record(r, self.split_id.clone()))
.collect::<Vec<SourceMessage>>();
if let Some(shard) = &resp.child_shards
&& !shard.is_empty()
{
// according to the doc https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/operation/get_records/struct.GetRecordsOutput.html
//
// > The list of the current shard's child shards, returned in the GetRecords API's response only when the end of the current shard is reached.
//
// It means the current shard is finished, ie. inactive, and we should stop reading it. Checking `millis_behind_latest` is a double check.

// The response will be like:
// {
// "Records": [], // empty
// "MillisBehindLatest": 2665000, // non-zero
// "ChildShards": [...] // non-empty
// // no NextShardIterator
// }

// Other executors are going to read the child shards.
finish_flag = true;

if !chunk.is_empty() {
// This should not happen. But be extra safe here.
yield chunk;
}

tracing::info!(
"shard {:?} reaches the end and is inactive, stop reading",
self.shard_id
);
break;
}
self.shard_iter = resp.next_shard_iterator().map(String::from);
let chunk = (resp.records().iter())
.map(|r| from_kinesis_record(r, self.split_id.clone()))
.collect::<Vec<SourceMessage>>();
if chunk.is_empty() {
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
Expand All @@ -166,13 +182,6 @@ impl KinesisSplitReader {
self.latest_offset
);
yield chunk;
if finish_flag {
tracing::info!(
"shard {:?} reaches the end and is inactive, stop reading",
self.shard_id
);
break;
}
}
Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => {
tracing::warn!("shard {:?} is closed, stop reading", self.shard_id);
Expand Down

0 comments on commit 71732b1

Please sign in to comment.