Skip to content

Commit

Permalink
fix: Kinesis: NextToken and StreamName cannot be provided together (#…
Browse files Browse the repository at this point in the history
…17687)

Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
2 people authored and tabVersion committed Jul 31, 2024
1 parent eb12ef0 commit 0bb5979
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions src/connector/src/source/kinesis/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context as _;
use anyhow::anyhow;
use async_trait::async_trait;
use aws_sdk_kinesis::types::Shard;
use aws_sdk_kinesis::Client as kinesis_client;
Expand Down Expand Up @@ -52,14 +52,26 @@ impl SplitEnumerator for KinesisSplitEnumerator {
let mut shard_collect: Vec<Shard> = Vec::new();

loop {
let list_shard_output = self
.client
.list_shards()
.set_next_token(next_token)
.stream_name(&self.stream_name)
.send()
.await
.context("failed to list kinesis shards")?;
let mut req = self.client.list_shards();
if let Some(token) = next_token.take() {
req = req.next_token(token);
} else {
req = req.stream_name(&self.stream_name);
}

let list_shard_output = match req.send().await {
Ok(output) => output,
Err(e) => {
if let Some(e_inner) = e.as_service_error()
&& e_inner.is_expired_next_token_exception()
{
tracing::info!("Kinesis ListShard token expired, retrying...");
next_token = None;
continue;
}
return Err(anyhow!(e).context("failed to list kinesis shards").into());
}
};
match list_shard_output.shards {
Some(shard) => shard_collect.extend(shard),
None => bail!("no shards in stream {}", &self.stream_name),
Expand Down

0 comments on commit 0bb5979

Please sign in to comment.