diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 123dc3a763ba8..1089ed114dca2 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -126,7 +126,6 @@ impl KinesisSplitReader { #[try_stream(ok = Vec < SourceMessage >, error = crate::error::ConnectorError)] async fn into_data_stream(mut self) { self.new_shard_iter().await?; - let mut finish_flag = false; loop { if self.shard_iter.is_none() { tracing::warn!( @@ -139,22 +138,39 @@ impl KinesisSplitReader { } match self.get_records().await { Ok(resp) => { - if resp.millis_behind_latest.is_none() - && let Some(shard) = &resp.child_shards + tracing::trace!(?self.shard_id, ?resp); + self.shard_iter = resp.next_shard_iterator().map(String::from); + let chunk = (resp.records().iter()) + .map(|r| from_kinesis_record(r, self.split_id.clone())) + .collect::>(); + if let Some(shard) = &resp.child_shards && !shard.is_empty() { // according to the doc https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/operation/get_records/struct.GetRecordsOutput.html // // > The list of the current shard's child shards, returned in the GetRecords API's response only when the end of the current shard is reached. - // - // It means the current shard is finished, ie. inactive, and we should stop reading it. Checking `millis_behind_latest` is a double check. + + // The response will be like: + // { + // "Records": [], // empty + // "MillisBehindLatest": 2665000, // non-zero + // "ChildShards": [...] // non-empty + // // no NextShardIterator + // } + // Other executors are going to read the child shards. - finish_flag = true; + + if !chunk.is_empty() { + // This should not happen. But be extra safe here. + yield chunk; + } + + tracing::info!( + "shard {:?} reaches the end and is inactive, stop reading", + self.shard_id + ); + break; } - self.shard_iter = resp.next_shard_iterator().map(String::from); - let chunk = (resp.records().iter()) - .map(|r| from_kinesis_record(r, self.split_id.clone())) - .collect::>(); if chunk.is_empty() { tokio::time::sleep(Duration::from_millis(200)).await; continue; @@ -166,13 +182,6 @@ impl KinesisSplitReader { self.latest_offset ); yield chunk; - if finish_flag { - tracing::info!( - "shard {:?} reaches the end and is inactive, stop reading", - self.shard_id - ); - break; - } } Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => { tracing::warn!("shard {:?} is closed, stop reading", self.shard_id);