diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 21ee98f93b2f8..7a5e3ba5f8b2b 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -20,7 +20,6 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; -use aws_sdk_dynamodb::client::Client as DynamoDBClient; use aws_sdk_kinesis::Client as KinesisClient; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use pulsar::{Authentication, Pulsar, TokioExecutor}; @@ -705,48 +704,6 @@ impl NatsCommon { } } -#[derive(Deserialize, Debug, Clone, WithOptions)] -pub struct DynamoDbCommon { - #[serde(rename = "table", alias = "dynamodb.table")] - pub table: String, - // #[serde(rename = "primary_key", alias = "dynamodb.primary_key")] - // pub primary_key: String, - // #[serde(rename = "sort_key", alias = "dynamodb.sort_key")] - // pub sort_key: Option, - #[serde(rename = "aws.region")] - pub stream_region: String, - #[serde(rename = "aws.endpoint")] - pub endpoint: Option, - #[serde(rename = "aws.credentials.access_key_id")] - pub credentials_access_key: Option, - #[serde(rename = "aws.credentials.secret_access_key")] - pub credentials_secret_access_key: Option, - #[serde(rename = "aws.credentials.session_token")] - pub session_token: Option, - #[serde(rename = "aws.credentials.role.arn")] - pub assume_role_arn: Option, - #[serde(rename = "aws.credentials.role.external_id")] - pub assume_role_external_id: Option, -} - -impl DynamoDbCommon { - pub(crate) async fn build_client(&self) -> ConnectorResult { - let config = AwsAuthProps { - region: Some(self.stream_region.clone()), - endpoint: self.endpoint.clone(), - access_key: self.credentials_access_key.clone(), - secret_key: self.credentials_secret_access_key.clone(), - session_token: self.session_token.clone(), - arn: self.assume_role_arn.clone(), - external_id: self.assume_role_external_id.clone(), - profile: Default::default(), - }; - let aws_config = config.build_config().await?; - - Ok(DynamoDBClient::new(&aws_config)) - } -} - pub(crate) fn load_certs( certificates: &str, ) -> ConnectorResult>> { diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index e8321a40443ea..6e65ed0ac1d84 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -23,7 +23,6 @@ use dynamodb::types::{ AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, TableStatus, WriteRequest, }; -use itertools::Itertools; use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; @@ -31,7 +30,6 @@ use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; -use serde_with::serde_as; use with_options::WithOptions; use super::log_store::DeliveryFutureManagerAddFuture; @@ -39,18 +37,28 @@ use super::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, }; use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; -use crate::connector_common::common::DynamoDbCommon; +use crate::connector_common::AwsAuthProps; +use crate::error::ConnectorResult; pub const DYNAMO_DB_SINK: &str = "dynamodb"; -#[serde_as] -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Deserialize, Debug, Clone, WithOptions)] pub struct DynamoDbConfig { + #[serde(rename = "table", alias = "dynamodb.table")] + pub table: String, + #[serde(flatten)] - pub common: DynamoDbCommon, + pub aws_auth_props: AwsAuthProps, } impl DynamoDbConfig { + pub async fn build_client(&self) -> ConnectorResult { + let config = &self.aws_auth_props; + let aws_config = config.build_config().await?; + + Ok(Client::new(&aws_config)) + } + fn from_hashmap(values: HashMap) -> Result { serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e))) @@ -71,11 +79,11 @@ impl Sink for DynamoDbSink { const SINK_NAME: &'static str = DYNAMO_DB_SINK; async fn validate(&self) -> Result<()> { - let client = (self.config.common.build_client().await) + let client = (self.config.build_client().await) .context("validate DynamoDB sink error") .map_err(SinkError::DynamoDb)?; - let table_name = &self.config.common.table; + let table_name = &self.config.table; let output = client .describe_table() .table_name(table_name) @@ -156,7 +164,7 @@ impl DynamoDbRequest { _ => return None, }; let vs = key - .into_iter() + .iter() .filter(|(k, _)| self.key_items.contains(k)) .map(|(_, v)| v.clone()) .collect(); @@ -240,8 +248,8 @@ pub struct DynamoDbSinkWriter { impl DynamoDbSinkWriter { pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { - let client = config.common.build_client().await?; - let table_name = &config.common.table; + let client = config.build_client().await?; + let table_name = &config.table; let output = client .describe_table() .table_name(table_name) @@ -259,12 +267,12 @@ impl DynamoDbSinkWriter { .unwrap_or_default() .into_iter() .map(|k| k.attribute_name) - .collect_vec(); + .collect(); let payload_writer = DynamoDbPayloadWriter { request_items: Vec::new(), client, - table: config.common.table, + table: config.table, dynamodb_keys, };