Skip to content

Commit

Permalink
refactor: de-DynamoDbCommon to AwsAuthProps
Browse files Browse the repository at this point in the history
  • Loading branch information
jetjinser committed May 16, 2024
1 parent d4686b2 commit 56bf209
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 56 deletions.
43 changes: 0 additions & 43 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::{self};
use aws_sdk_dynamodb::client::Client as DynamoDBClient;
use aws_sdk_kinesis::Client as KinesisClient;
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::{Authentication, Pulsar, TokioExecutor};
Expand Down Expand Up @@ -705,48 +704,6 @@ impl NatsCommon {
}
}

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct DynamoDbCommon {
#[serde(rename = "table", alias = "dynamodb.table")]
pub table: String,
// #[serde(rename = "primary_key", alias = "dynamodb.primary_key")]
// pub primary_key: String,
// #[serde(rename = "sort_key", alias = "dynamodb.sort_key")]
// pub sort_key: Option<String>,
#[serde(rename = "aws.region")]
pub stream_region: String,
#[serde(rename = "aws.endpoint")]
pub endpoint: Option<String>,
#[serde(rename = "aws.credentials.access_key_id")]
pub credentials_access_key: Option<String>,
#[serde(rename = "aws.credentials.secret_access_key")]
pub credentials_secret_access_key: Option<String>,
#[serde(rename = "aws.credentials.session_token")]
pub session_token: Option<String>,
#[serde(rename = "aws.credentials.role.arn")]
pub assume_role_arn: Option<String>,
#[serde(rename = "aws.credentials.role.external_id")]
pub assume_role_external_id: Option<String>,
}

impl DynamoDbCommon {
pub(crate) async fn build_client(&self) -> ConnectorResult<DynamoDBClient> {
let config = AwsAuthProps {
region: Some(self.stream_region.clone()),
endpoint: self.endpoint.clone(),
access_key: self.credentials_access_key.clone(),
secret_key: self.credentials_secret_access_key.clone(),
session_token: self.session_token.clone(),
arn: self.assume_role_arn.clone(),
external_id: self.assume_role_external_id.clone(),
profile: Default::default(),
};
let aws_config = config.build_config().await?;

Ok(DynamoDBClient::new(&aws_config))
}
}

pub(crate) fn load_certs(
certificates: &str,
) -> ConnectorResult<Vec<rustls_pki_types::CertificateDer<'static>>> {
Expand Down
34 changes: 21 additions & 13 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,42 @@ use dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics,
TableStatus, WriteRequest,
};
use itertools::Itertools;
use maplit::hashmap;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row as _;
use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_derive::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use super::log_store::DeliveryFutureManagerAddFuture;
use super::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
};
use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam};
use crate::connector_common::common::DynamoDbCommon;
use crate::connector_common::AwsAuthProps;
use crate::error::ConnectorResult;

pub const DYNAMO_DB_SINK: &str = "dynamodb";

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct DynamoDbConfig {
#[serde(rename = "table", alias = "dynamodb.table")]
pub table: String,

#[serde(flatten)]
pub common: DynamoDbCommon,
pub aws_auth_props: AwsAuthProps,
}

impl DynamoDbConfig {
pub async fn build_client(&self) -> ConnectorResult<Client> {
let config = &self.aws_auth_props;
let aws_config = config.build_config().await?;

Ok(Client::new(&aws_config))
}

fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
serde_json::from_value::<DynamoDbConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))
Expand All @@ -71,11 +79,11 @@ impl Sink for DynamoDbSink {
const SINK_NAME: &'static str = DYNAMO_DB_SINK;

async fn validate(&self) -> Result<()> {
let client = (self.config.common.build_client().await)
let client = (self.config.build_client().await)
.context("validate DynamoDB sink error")
.map_err(SinkError::DynamoDb)?;

let table_name = &self.config.common.table;
let table_name = &self.config.table;
let output = client
.describe_table()
.table_name(table_name)
Expand Down Expand Up @@ -156,7 +164,7 @@ impl DynamoDbRequest {
_ => return None,
};
let vs = key
.into_iter()
.iter()
.filter(|(k, _)| self.key_items.contains(k))
.map(|(_, v)| v.clone())
.collect();
Expand Down Expand Up @@ -240,8 +248,8 @@ pub struct DynamoDbSinkWriter {

impl DynamoDbSinkWriter {
pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result<Self> {
let client = config.common.build_client().await?;
let table_name = &config.common.table;
let client = config.build_client().await?;
let table_name = &config.table;
let output = client
.describe_table()
.table_name(table_name)
Expand All @@ -259,12 +267,12 @@ impl DynamoDbSinkWriter {
.unwrap_or_default()
.into_iter()
.map(|k| k.attribute_name)
.collect_vec();
.collect();

let payload_writer = DynamoDbPayloadWriter {
request_items: Vec::new(),
client,
table: config.common.table,
table: config.table,
dynamodb_keys,
};

Expand Down

0 comments on commit 56bf209

Please sign in to comment.