From 85fd30e12d76e3d0e3e6fff06f7ab01b4297f159 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 15 Jul 2024 20:00:04 +0800 Subject: [PATCH 1/5] fix failed to list kinesis shards: service error: InvalidArgumentException: NextToken and StreamName cannot be provided together." Signed-off-by: tabVersion --- .../src/source/kinesis/enumerator/client.rs | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 840def08f685..70eda69a92c4 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Context as _; use async_trait::async_trait; use aws_sdk_kinesis::types::Shard; use aws_sdk_kinesis::Client as kinesis_client; use risingwave_common::bail; +use thiserror_ext::AsReport; use crate::error::ConnectorResult as Result; use crate::source::kinesis::split::KinesisOffset; @@ -52,14 +52,26 @@ impl SplitEnumerator for KinesisSplitEnumerator { let mut shard_collect: Vec = Vec::new(); loop { - let list_shard_output = self - .client - .list_shards() - .set_next_token(next_token) - .stream_name(&self.stream_name) - .send() - .await - .context("failed to list kinesis shards")?; + let mut req = self.client.list_shards(); + if let Some(token) = next_token.take() { + req = req.next_token(token); + } else { + req = req.stream_name(&self.stream_name); + } + + let list_shard_output = match req.send().await { + Ok(output) => output, + Err(e) => { + if let Some(e_inner) = e.as_service_error() + && e_inner.is_expired_next_token_exception() + { + tracing::info!("Kinesis ListShard token expired, retrying..."); + next_token = None; + continue; + } + bail!("Kinesis ListShards service error: {}", e.as_report()) + } + }; match list_shard_output.shards { Some(shard) => shard_collect.extend(shard), None => bail!("no shards in stream {}", &self.stream_name), From 9148cd6ba8e8a12f33d30b88c13a1d347cf4a3be Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 15 Jul 2024 20:13:14 +0800 Subject: [PATCH 2/5] fix Signed-off-by: tabVersion --- src/connector/src/source/kinesis/enumerator/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 70eda69a92c4..aa42015b8d8f 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -69,7 +69,7 @@ impl SplitEnumerator for KinesisSplitEnumerator { next_token = None; continue; } - bail!("Kinesis ListShards service error: {}", e.as_report()) + bail!("Kinesis ListShards service error: {:?}", e.as_report()) } }; match list_shard_output.shards { From bffee42c41dc6c2e00218a4fbf9f14b5f30547fa Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 15 Jul 2024 20:39:56 +0800 Subject: [PATCH 3/5] fix Signed-off-by: tabVersion --- src/connector/src/source/kinesis/enumerator/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index aa42015b8d8f..ec5e06620d2e 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -69,7 +69,7 @@ impl SplitEnumerator for KinesisSplitEnumerator { next_token = None; continue; } - bail!("Kinesis ListShards service error: {:?}", e.as_report()) + bail!("Kinesis ListShards service error: {}", e.to_report_string()); } }; match list_shard_output.shards { From 0e85bf04eedaa11e5f9946592d453afd7cad6ad0 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 17 Jul 2024 11:44:59 +0800 Subject: [PATCH 4/5] fix Signed-off-by: tabVersion --- src/connector/src/source/kinesis/enumerator/client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index ec5e06620d2e..85ff051737cb 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use async_trait::async_trait; use aws_sdk_kinesis::types::Shard; use aws_sdk_kinesis::Client as kinesis_client; @@ -69,7 +70,7 @@ impl SplitEnumerator for KinesisSplitEnumerator { next_token = None; continue; } - bail!("Kinesis ListShards service error: {}", e.to_report_string()); + return Err(anyhow!(e).context("failed to list kinesis shards").into()); } }; match list_shard_output.shards { From d947f906a0d8440e1aa0791ec71ddc5d8ebfde43 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 17 Jul 2024 12:05:22 +0800 Subject: [PATCH 5/5] fix Signed-off-by: tabVersion --- src/connector/src/source/kinesis/enumerator/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 85ff051737cb..423516fa5bd4 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -17,7 +17,6 @@ use async_trait::async_trait; use aws_sdk_kinesis::types::Shard; use aws_sdk_kinesis::Client as kinesis_client; use risingwave_common::bail; -use thiserror_ext::AsReport; use crate::error::ConnectorResult as Result; use crate::source::kinesis::split::KinesisOffset;