diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 840def08f6855..423516fa5bd47 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Context as _; +use anyhow::anyhow; use async_trait::async_trait; use aws_sdk_kinesis::types::Shard; use aws_sdk_kinesis::Client as kinesis_client; @@ -52,14 +52,26 @@ impl SplitEnumerator for KinesisSplitEnumerator { let mut shard_collect: Vec = Vec::new(); loop { - let list_shard_output = self - .client - .list_shards() - .set_next_token(next_token) - .stream_name(&self.stream_name) - .send() - .await - .context("failed to list kinesis shards")?; + let mut req = self.client.list_shards(); + if let Some(token) = next_token.take() { + req = req.next_token(token); + } else { + req = req.stream_name(&self.stream_name); + } + + let list_shard_output = match req.send().await { + Ok(output) => output, + Err(e) => { + if let Some(e_inner) = e.as_service_error() + && e_inner.is_expired_next_token_exception() + { + tracing::info!("Kinesis ListShard token expired, retrying..."); + next_token = None; + continue; + } + return Err(anyhow!(e).context("failed to list kinesis shards").into()); + } + }; match list_shard_output.shards { Some(shard) => shard_collect.extend(shard), None => bail!("no shards in stream {}", &self.stream_name),