Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Kinesis: NextToken and StreamName cannot be provided together #17687

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/connector/src/source/kinesis/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context as _;
use async_trait::async_trait;
use aws_sdk_kinesis::types::Shard;
use aws_sdk_kinesis::Client as kinesis_client;
use risingwave_common::bail;
use thiserror_ext::AsReport;

use crate::error::ConnectorResult as Result;
use crate::source::kinesis::split::KinesisOffset;
Expand Down Expand Up @@ -52,14 +52,26 @@ impl SplitEnumerator for KinesisSplitEnumerator {
let mut shard_collect: Vec<Shard> = Vec::new();

loop {
let list_shard_output = self
.client
.list_shards()
.set_next_token(next_token)
.stream_name(&self.stream_name)
.send()
.await
.context("failed to list kinesis shards")?;
let mut req = self.client.list_shards();
if let Some(token) = next_token.take() {
req = req.next_token(token);
} else {
req = req.stream_name(&self.stream_name);
}

let list_shard_output = match req.send().await {
Ok(output) => output,
Err(e) => {
if let Some(e_inner) = e.as_service_error()
&& e_inner.is_expired_next_token_exception()
{
tracing::info!("Kinesis ListShard token expired, retrying...");
next_token = None;
continue;
}
bail!("Kinesis ListShards service error: {}", e.to_report_string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinesis SdkError has one generic param, seems cannot fit into ConnectorError Impl

pub type SdkError<E, R = ::aws_smithy_runtime_api::client::orchestrator::HttpResponse> = ::aws_smithy_runtime_api::client::result::SdkError<E, R>;
def_anyhow_newtype! {
    pub ConnectorError,

    // Common errors
    std::io::Error => transparent,

    // Fine-grained connector errors
    AccessError => transparent,
    WireFormatError => transparent,
    ConcurrentRequestError => transparent,
    InvalidOptionError => transparent,
    SinkError => transparent,
    PbFieldNotFound => transparent,

    // TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion.

    // Parsing errors
    url::ParseError => "failed to parse url",
    serde_json::Error => "failed to parse json",
    csv::Error => "failed to parse csv",

    uuid::Error => transparent, // believed to be self-explanatory

    // Connector errors
    opendal::Error => transparent, // believed to be self-explanatory
    parquet::errors::ParquetError => transparent,
    ArrayError => "Array error",
    sqlx::Error => transparent, // believed to be self-explanatory
    mysql_async::Error => "MySQL error",
    tokio_postgres::Error => "Postgres error",
    apache_avro::Error => "Avro error",
    rdkafka::error::KafkaError => "Kafka error",
    pulsar::Error => "Pulsar error",
    aws_sdk_kinesis::error => "Kinesis error",

    async_nats::jetstream::consumer::StreamError => "Nats error",
    async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
    async_nats::jetstream::context::CreateStreamError => "Nats error",
    async_nats::jetstream::stream::ConsumerError => "Nats error",
    icelake::Error => "Iceberg error",
    iceberg::Error => "IcebergV2 error",
    redis::RedisError => "Redis error",
    arrow_schema::ArrowError => "Arrow error",
    arrow_schema_iceberg::ArrowError => "Arrow error",
    google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
    rumqttc::tokio_rustls::rustls::Error => "TLS error",
    rumqttc::v5::ClientError => "MQTT error",
    rumqttc::v5::OptionError => "MQTT error",
    mongodb::error::Error => "Mongodb error",

    openssl::error::ErrorStack => "OpenSSL error",
}

Copy link
Member

@BugenZhao BugenZhao Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just simply try return Err(anyhow!(e).context("failed to list kinesis shards").into())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think it just works.

}
};
match list_shard_output.shards {
Some(shard) => shard_collect.extend(shard),
None => bail!("no shards in stream {}", &self.stream_name),
Expand Down
Loading