From bf5e6fe34dae9bf738ff751a6b77f46977e6fa97 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Fri, 10 May 2024 01:49:23 +0800 Subject: [PATCH 1/9] feat(connector): add DynamoDB sink --- Cargo.lock | 79 ++++-- src/connector/Cargo.toml | 1 + src/connector/src/connector_common/common.rs | 43 +++ src/connector/src/sink/dynamodb.rs | 268 +++++++++++++++++++ src/connector/src/sink/mod.rs | 8 + 5 files changed, 376 insertions(+), 23 deletions(-) create mode 100644 src/connector/src/sink/dynamodb.rs diff --git a/Cargo.lock b/Cargo.lock index ec551ead138da..2a6e15761eba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1145,9 +1145,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d67c6836a1009b23e3f4cd1457c83e0aa49a490d9c3033b53c3f7b8cf2facc0f" +checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1199,12 +1199,11 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.0.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed7ef604a15fd0d4d9e43701295161ea6b504b63c44990ead352afea2bc15e9" +checksum = "75588e7ee5e8496eed939adac2035a6dbab9f7eb2acdd9ab2d31856dab6f3955" dependencies = [ "aws-credential-types", - "aws-http", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", @@ -1212,13 +1211,39 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", + "bytes", "fastrand 2.0.1", "http 0.2.9", + "http-body 0.4.5", "percent-encoding", + "pin-project-lite", "tracing", "uuid", ] +[[package]] +name = "aws-sdk-dynamodb" +version = "1.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "281c887364ff494a6ce0b492b03da5fa7729da004dbb875e2c81aceb24debe89" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-kinesis" version = "1.3.0" @@ -1296,9 +1321,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.1.1" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d222297ca90209dc62245f0a490355795f29de362eb5c19caea4f7f55fe69078" +checksum = "58b56f1cbe6fd4d0c2573df72868f20ab1c125ca9c9dbce17927a463433a2e57" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -1311,6 +1336,7 @@ dependencies = [ "hex", "hmac", "http 0.2.9", + "http 1.0.0", "once_cell", "p256 0.11.1", "percent-encoding", @@ -1324,9 +1350,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.1.3" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eac0bb78e9e2765699999a02d7bfb4e6ad8f13e0962ebb9f5202b1d8cd76006" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" dependencies = [ "futures-util", "pin-project-lite", @@ -1356,9 +1382,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.3" +version = "0.60.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "682371561562d08ab437766903c6bc28f4f95d7ab2ecfb389bda7849dd98aefe" +checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" dependencies = [ "aws-smithy-types", "bytes", @@ -1367,9 +1393,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.3" +version = "0.60.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "365ca49744b2bda2f1e2dc03b856da3fa5a28ca5b0a41e41d7ff5305a8fae190" +checksum = "4a7de001a1b9a25601016d8057ea16e31a45fdca3751304c8edf4ad72e706c08" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -1388,9 +1414,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.0" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a46dd338dc9576d6a6a5b5a19bd678dcad018ececee11cf28ecd7588bd1a55c" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" dependencies = [ "aws-smithy-types", ] @@ -1407,9 +1433,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.1.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ab9cb6fee50680af8ceaa293ae79eba32095ca117161cb323f9ee30dd87d139" +checksum = "c9ac79e9f3a4d576f3cd4a470a0275b138d9e7b11b1cd514a6858ae0a79dd5bb" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1420,6 +1446,7 @@ dependencies = [ "h2 0.3.26", "http 0.2.9", "http-body 0.4.5", + "http-body 1.0.0", "hyper 0.14.27", "hyper-rustls 0.24.2", "once_cell", @@ -1432,31 +1459,36 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.1.3" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02ca2da7619517310bfead6d18abcdde90f1439224d887d608503cfacff46dff" +checksum = "04ec42c2f5c0e7796a2848dde4d9f3bf8ce12ccbb3d5aa40c52fa0cdd61a1c47" dependencies = [ "aws-smithy-async", "aws-smithy-types", "bytes", "http 0.2.9", + "http 1.0.0", "pin-project-lite", "tokio", "tracing", + "zeroize", ] [[package]] name = "aws-smithy-types" -version = "1.1.3" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d4bb944488536cd2fef43212d829bc7e9a8bfc4afa079d21170441e7be8d2d0" +checksum = "baf98d97bba6ddaba180f1b1147e202d8fe04940403a95a3f826c790f931bbd1" dependencies = [ "base64-simd 0.8.0", "bytes", "bytes-utils", "futures-core", "http 0.2.9", + "http 1.0.0", "http-body 0.4.5", + "http-body 1.0.0", + "http-body-util", "hyper 0.14.27", "itoa", "num-integer", @@ -1490,9 +1522,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.1.3" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2739d97d47f47cdf0d27982019a405dcc736df25925d1a75049f1faa79df88" +checksum = "a807d90cd50a969b3d95e4e7ad1491fcae13c6e83948d8728363ecc09d66343a" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -10485,6 +10517,7 @@ dependencies = [ "auto_impl", "aws-config", "aws-credential-types", + "aws-sdk-dynamodb", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index be5242e819163..67ec939b41dc1 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -32,6 +32,7 @@ auto_enums = { workspace = true } auto_impl = "1" aws-config = { workspace = true } aws-credential-types = { workspace = true } +aws-sdk-dynamodb = "1.23.0" aws-sdk-kinesis = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 302b68dd664a1..fd0f8e68cfa89 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -20,6 +20,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; +use aws_sdk_dynamodb::client::Client as DynamoDBClient; use aws_sdk_kinesis::Client as KinesisClient; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use pulsar::{Authentication, Pulsar, TokioExecutor}; @@ -685,6 +686,48 @@ impl NatsCommon { } } +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct DynamoDbCommon { + #[serde(rename = "table", alias = "dynamodb.table")] + pub table: String, + // #[serde(rename = "primary_key", alias = "dynamodb.primary_key")] + // pub primary_key: String, + // #[serde(rename = "sort_key", alias = "dynamodb.sort_key")] + // pub sort_key: Option, + #[serde(rename = "aws.region")] + pub stream_region: String, + #[serde(rename = "aws.endpoint")] + pub endpoint: Option, + #[serde(rename = "aws.credentials.access_key_id")] + pub credentials_access_key: Option, + #[serde(rename = "aws.credentials.secret_access_key")] + pub credentials_secret_access_key: Option, + #[serde(rename = "aws.credentials.session_token")] + pub session_token: Option, + #[serde(rename = "aws.credentials.role.arn")] + pub assume_role_arn: Option, + #[serde(rename = "aws.credentials.role.external_id")] + pub assume_role_external_id: Option, +} + +impl DynamoDbCommon { + pub(crate) async fn build_client(&self) -> ConnectorResult { + let config = AwsAuthProps { + region: Some(self.stream_region.clone()), + endpoint: self.endpoint.clone(), + access_key: self.credentials_access_key.clone(), + secret_key: self.credentials_secret_access_key.clone(), + session_token: self.session_token.clone(), + arn: self.assume_role_arn.clone(), + external_id: self.assume_role_external_id.clone(), + profile: Default::default(), + }; + let aws_config = config.build_config().await?; + + Ok(DynamoDBClient::new(&aws_config)) + } +} + pub(crate) fn load_certs( certificates: &str, ) -> ConnectorResult>> { diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs new file mode 100644 index 0000000000000..63aec607e92db --- /dev/null +++ b/src/connector/src/sink/dynamodb.rs @@ -0,0 +1,268 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::usize; + +use anyhow::{anyhow, Context}; +use aws_sdk_dynamodb as dynamodb; +use aws_sdk_dynamodb::client::Client; +use aws_smithy_types::Blob; +use dynamodb::types::{AttributeValue, TableStatus}; +use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row as _; +use risingwave_common::types::{ScalarRefImpl, ToText}; +use risingwave_common::util::iter_util::ZipEqDebug; +use serde_derive::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::connector_common::common::DynamoDbCommon; + +pub const DYNAMO_DB_SINK: &str = "dynamodb"; + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct DynamoDbConfig { + #[serde(flatten)] + pub common: DynamoDbCommon, +} + +impl DynamoDbConfig { + fn from_hashmap(values: HashMap) -> Result { + serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e))) + } +} + +#[derive(Clone, Debug)] +pub struct DynamoDbSink { + pub config: DynamoDbConfig, + schema: Schema, +} + +impl Sink for DynamoDbSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = DYNAMO_DB_SINK; + + async fn validate(&self) -> Result<()> { + let client = (self.config.common.build_client().await) + .context("validate DynamoDB sink error") + .map_err(SinkError::DynamoDb)?; + + let table_name = &self.config.common.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + if !matches!(table.table_status(), Some(TableStatus::Active)) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} is not active", + table_name + ))); + } + + // validate all key are in schema + let fields = self.schema.fields(); + let keys = table.key_schema(); + for key in keys { + if !fields.iter().any(|f| f.name == key.attribute_name()) { + return Err(SinkError::DynamoDb(anyhow!( + "DynamoDB table key {} not found in schema", + key.attribute_name() + ))); + } + } + + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok( + DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone()) + .await? + .into_log_sinker(usize::MAX), + ) + } +} + +impl TryFrom for DynamoDbSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = DynamoDbConfig::from_hashmap(param.properties)?; + + Ok(Self { config, schema }) + } +} + +struct DynamoDbPayloadWriter { + insert_items: HashMap, + delete_items: HashMap, + client: Client, + table: String, +} + +impl DynamoDbPayloadWriter { + fn write_one_insert(&mut self, key: String, value: AttributeValue) { + self.insert_items.insert(key, value); + } + + fn write_one_delete(&mut self, key: String, value: AttributeValue) { + self.delete_items.insert(key, value); + } + + async fn write_chunk(&mut self) -> Result<()> { + if !self.insert_items.is_empty() { + let new_items = std::mem::take(&mut self.insert_items); + self.client + .put_item() + .table_name(self.table.clone()) + .set_item(Some(new_items)) + .send() + .await + .map_err(|e| { + SinkError::DynamoDb(anyhow!(e).context("failed to put item to DynamoDB sink")) + }) + .map(|_| ()) + } else if !self.delete_items.is_empty() { + let new_items = std::mem::take(&mut self.delete_items); + self.client + .delete_item() + .table_name(self.table.clone()) + .set_key(Some(new_items)) + .send() + .await + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + }) + .map(|_| ()) + } else { + Ok(()) + } + } +} + +pub struct DynamoDbSinkWriter { + payload_writer: DynamoDbPayloadWriter, + formatter: DynamoDbFormatter, +} + +impl DynamoDbSinkWriter { + pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { + let client = config.common.build_client().await?; + + let payload_writer = DynamoDbPayloadWriter { + insert_items: HashMap::new(), + delete_items: HashMap::new(), + client, + table: config.common.table, + }; + + Ok(Self { + payload_writer, + formatter: DynamoDbFormatter { schema }, + }) + } + + async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + match op { + Op::Insert | Op::UpdateInsert => { + for (k, v) in self.formatter.format_row(row) { + self.payload_writer.write_one_insert(k, v); + } + } + Op::Delete | Op::UpdateDelete => { + for (k, v) in self.formatter.format_row(row) { + self.payload_writer.write_one_delete(k, v); + } + } + } + } + self.payload_writer.write_chunk().await + } +} + +impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + self.write_chunk_inner(chunk).await + } +} + +struct DynamoDbFormatter { + schema: Schema, +} + +impl DynamoDbFormatter { + fn format_row(&self, row: RowRef<'_>) -> Vec<(String, AttributeValue)> { + let mut items = Vec::new(); + for (scalar, field) in row.iter().zip_eq_debug((self.schema.clone()).into_fields()) { + let attr_value = map_data_type(scalar); + items.push((field.name, attr_value)); + } + + items + } +} + +fn map_data_type(scalar: Option>) -> AttributeValue { + match scalar { + None => AttributeValue::Null(true), + Some(s) => match s { + number @ (ScalarRefImpl::Int16(_) + | ScalarRefImpl::Int32(_) + | ScalarRefImpl::Int64(_) + | ScalarRefImpl::Int256(_) + | ScalarRefImpl::Float32(_) + | ScalarRefImpl::Float64(_) + | ScalarRefImpl::Decimal(_) + | ScalarRefImpl::Serial(_)) => AttributeValue::N(number.to_text()), + string @ (ScalarRefImpl::Utf8(_) + | ScalarRefImpl::Interval(_) + | ScalarRefImpl::Date(_) + | ScalarRefImpl::Time(_) + | ScalarRefImpl::Timestamp(_) + | ScalarRefImpl::Timestamptz(_) + | ScalarRefImpl::Struct(_) + | ScalarRefImpl::Jsonb(_)) => AttributeValue::S(string.to_text()), + ScalarRefImpl::Bool(x) => AttributeValue::Bool(x), + ScalarRefImpl::Bytea(x) => AttributeValue::B(Blob::new(x)), + ScalarRefImpl::List(x) => AttributeValue::L(x.iter().map(map_data_type).collect()), + }, + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c430b4303f1e9..331e527e07139 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -20,6 +20,7 @@ pub mod coordinate; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; +pub mod dynamodb; pub mod elasticsearch; pub mod encoder; pub mod formatter; @@ -96,6 +97,7 @@ macro_rules! for_all_sinks { { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, + { DynamoDb, $crate::sink::dynamodb::DynamoDbSink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::trivial::TableSink } } @@ -561,6 +563,12 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), + #[error("DynamoDB error: {0}")] + DynamoDb( + #[source] + #[backtrace] + anyhow::Error, + ), #[error(transparent)] Connector( #[from] From a808f9c630365c5b6f4d2066e20b85b610090b55 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Sat, 11 May 2024 01:10:24 +0800 Subject: [PATCH 2/9] feat: add DynamoDB sink key schema validation --- src/connector/src/sink/dynamodb.rs | 48 ++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 63aec607e92db..d9f5742a39ea2 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::usize; use anyhow::{anyhow, Context}; @@ -56,6 +56,7 @@ impl DynamoDbConfig { pub struct DynamoDbSink { pub config: DynamoDbConfig, schema: Schema, + pk_indices: Vec, } impl Sink for DynamoDbSink { @@ -89,14 +90,39 @@ impl Sink for DynamoDbSink { ))); } - // validate all key are in schema - let fields = self.schema.fields(); - let keys = table.key_schema(); - for key in keys { - if !fields.iter().any(|f| f.name == key.attribute_name()) { + let all_set: HashSet = self + .schema + .fields() + .iter() + .map(|f| f.name.clone()) + .collect(); + let pk_set: HashSet = self + .schema + .fields() + .iter() + .enumerate() + .filter(|(k, _)| self.pk_indices.contains(k)) + .map(|(_, v)| v.name.clone()) + .collect(); + let key_schema = table.key_schema(); + + // 1. validate all DynamoDb key element are in RisingWave schema and are primary key + for key_element in key_schema.iter().map(|x| x.attribute_name()) { + if !pk_set.iter().any(|x| x == key_element) { return Err(SinkError::DynamoDb(anyhow!( - "DynamoDB table key {} not found in schema", - key.attribute_name() + "table {} key field {} not found in schema or not primary key", + table_name, + key_element + ))); + } + } + // 2. validate RisingWave schema fields are subset of dynamodb key fields + for ref field in all_set { + if !key_schema.iter().any(|x| x.attribute_name() == field) { + return Err(SinkError::DynamoDb(anyhow!( + "table {} field {} not found in dynamodb key", + table_name, + field ))); } } @@ -120,7 +146,11 @@ impl TryFrom for DynamoDbSink { let schema = param.schema(); let config = DynamoDbConfig::from_hashmap(param.properties)?; - Ok(Self { config, schema }) + Ok(Self { + config, + schema, + pk_indices: param.downstream_pk, + }) } } From 75fe60aabb8e1b13109d41f888cf9a45e2adce70 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 13 May 2024 19:13:38 +0800 Subject: [PATCH 3/9] fix: map_data_type to take data_type as argument --- src/connector/src/sink/dynamodb.rs | 115 ++++++++++++++++++----------- 1 file changed, 73 insertions(+), 42 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index d9f5742a39ea2..5c5078c584bc1 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -21,9 +21,9 @@ use aws_sdk_dynamodb::client::Client; use aws_smithy_types::Blob; use dynamodb::types::{AttributeValue, TableStatus}; use risingwave_common::array::{Op, RowRef, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row as _; -use risingwave_common::types::{ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; use serde_with::serde_as; @@ -182,8 +182,9 @@ impl DynamoDbPayloadWriter { .map_err(|e| { SinkError::DynamoDb(anyhow!(e).context("failed to put item to DynamoDB sink")) }) - .map(|_| ()) - } else if !self.delete_items.is_empty() { + .map(|_| ())? + } + if !self.delete_items.is_empty() { let new_items = std::mem::take(&mut self.delete_items); self.client .delete_item() @@ -196,10 +197,10 @@ impl DynamoDbPayloadWriter { anyhow!(e).context("failed to delete item from DynamoDB sink"), ) }) - .map(|_| ()) - } else { - Ok(()) + .map(|_| ())? } + + Ok(()) } } @@ -229,12 +230,12 @@ impl DynamoDbSinkWriter { for (op, row) in chunk.rows() { match op { Op::Insert | Op::UpdateInsert => { - for (k, v) in self.formatter.format_row(row) { + for (k, v) in self.formatter.format_row(row)? { self.payload_writer.write_one_insert(k, v); } } Op::Delete | Op::UpdateDelete => { - for (k, v) in self.formatter.format_row(row) { + for (k, v) in self.formatter.format_row(row)? { self.payload_writer.write_one_delete(k, v); } } @@ -259,40 +260,70 @@ struct DynamoDbFormatter { } impl DynamoDbFormatter { - fn format_row(&self, row: RowRef<'_>) -> Vec<(String, AttributeValue)> { - let mut items = Vec::new(); - for (scalar, field) in row.iter().zip_eq_debug((self.schema.clone()).into_fields()) { - let attr_value = map_data_type(scalar); - items.push((field.name, attr_value)); - } - - items + fn format_row(&self, row: RowRef<'_>) -> Result> { + row.iter() + .zip_eq_debug((self.schema.clone()).into_fields()) + .map(|(scalar, field)| { + map_data_type(scalar, &field.data_type()).map(|attr| (field.name, attr)) + }) + .collect() } } -fn map_data_type(scalar: Option>) -> AttributeValue { - match scalar { - None => AttributeValue::Null(true), - Some(s) => match s { - number @ (ScalarRefImpl::Int16(_) - | ScalarRefImpl::Int32(_) - | ScalarRefImpl::Int64(_) - | ScalarRefImpl::Int256(_) - | ScalarRefImpl::Float32(_) - | ScalarRefImpl::Float64(_) - | ScalarRefImpl::Decimal(_) - | ScalarRefImpl::Serial(_)) => AttributeValue::N(number.to_text()), - string @ (ScalarRefImpl::Utf8(_) - | ScalarRefImpl::Interval(_) - | ScalarRefImpl::Date(_) - | ScalarRefImpl::Time(_) - | ScalarRefImpl::Timestamp(_) - | ScalarRefImpl::Timestamptz(_) - | ScalarRefImpl::Struct(_) - | ScalarRefImpl::Jsonb(_)) => AttributeValue::S(string.to_text()), - ScalarRefImpl::Bool(x) => AttributeValue::Bool(x), - ScalarRefImpl::Bytea(x) => AttributeValue::B(Blob::new(x)), - ScalarRefImpl::List(x) => AttributeValue::L(x.iter().map(map_data_type).collect()), - }, - } +fn map_data_type( + scalar_ref: Option>, + data_type: &DataType, +) -> Result { + let Some(scalar_ref) = scalar_ref else { + return Ok(AttributeValue::Null(true)); + }; + let attr = match (data_type, scalar_ref) { + (DataType::Int16, ScalarRefImpl::Int16(_)) + | (DataType::Int32, ScalarRefImpl::Int32(_)) + | (DataType::Int64, ScalarRefImpl::Int64(_)) + | (DataType::Int256, ScalarRefImpl::Int256(_)) + | (DataType::Float32, ScalarRefImpl::Float32(_)) + | (DataType::Float64, ScalarRefImpl::Float64(_)) + | (DataType::Decimal, ScalarRefImpl::Decimal(_)) + | (DataType::Serial, ScalarRefImpl::Serial(_)) => { + AttributeValue::N(scalar_ref.to_text_with_type(data_type)) + } + // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699) + (DataType::Varchar, ScalarRefImpl::Utf8(_)) + | (DataType::Interval, ScalarRefImpl::Interval(_)) + | (DataType::Date, ScalarRefImpl::Date(_)) + | (DataType::Time, ScalarRefImpl::Time(_)) + | (DataType::Timestamp, ScalarRefImpl::Timestamp(_)) + | (DataType::Timestamptz, ScalarRefImpl::Timestamptz(_)) + | (DataType::Jsonb, ScalarRefImpl::Jsonb(_)) => { + AttributeValue::S(scalar_ref.to_text_with_type(data_type)) + } + (DataType::Boolean, ScalarRefImpl::Bool(v)) => AttributeValue::Bool(v), + (DataType::Bytea, ScalarRefImpl::Bytea(v)) => AttributeValue::B(Blob::new(v)), + (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { + let list_attr = list_ref + .iter() + .map(|x| map_data_type(x, datatype)) + .collect::>>()?; + AttributeValue::L(list_attr) + } + (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { + let mut map = HashMap::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) { + let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; + map.insert(sub_field.name.clone(), attr); + } + AttributeValue::M(map) + } + (data_type, scalar_ref) => { + return Err(SinkError::DynamoDb(anyhow!(format!( + "map_data_type: unsupported data type: logical type: {:?}, physical type: {:?}", + data_type, scalar_ref + ),))); + } + }; + Ok(attr) } From 2f7b83911e0377eedeeeec47ca5a48b8a0f02aaa Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Thu, 16 May 2024 13:07:51 +0800 Subject: [PATCH 4/9] fix(sink): auto de-dup dynamodb requests before sending refactor dynamodb sink to use batch_write_item --- src/connector/src/sink/dynamodb.rs | 154 +++++++++++++++++++---------- 1 file changed, 100 insertions(+), 54 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 5c5078c584bc1..866412055bcd2 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -19,7 +19,12 @@ use anyhow::{anyhow, Context}; use aws_sdk_dynamodb as dynamodb; use aws_sdk_dynamodb::client::Client; use aws_smithy_types::Blob; -use dynamodb::types::{AttributeValue, TableStatus}; +use dynamodb::types::{ + AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, + TableStatus, WriteRequest, +}; +use itertools::Itertools; +use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row as _; @@ -90,12 +95,6 @@ impl Sink for DynamoDbSink { ))); } - let all_set: HashSet = self - .schema - .fields() - .iter() - .map(|f| f.name.clone()) - .collect(); let pk_set: HashSet = self .schema .fields() @@ -106,7 +105,6 @@ impl Sink for DynamoDbSink { .collect(); let key_schema = table.key_schema(); - // 1. validate all DynamoDb key element are in RisingWave schema and are primary key for key_element in key_schema.iter().map(|x| x.attribute_name()) { if !pk_set.iter().any(|x| x == key_element) { return Err(SinkError::DynamoDb(anyhow!( @@ -116,16 +114,6 @@ impl Sink for DynamoDbSink { ))); } } - // 2. validate RisingWave schema fields are subset of dynamodb key fields - for ref field in all_set { - if !key_schema.iter().any(|x| x.attribute_name() == field) { - return Err(SinkError::DynamoDb(anyhow!( - "table {} field {} not found in dynamodb key", - table_name, - field - ))); - } - } Ok(()) } @@ -154,50 +142,91 @@ impl TryFrom for DynamoDbSink { } } +#[derive(Debug)] +struct DynamoDbRequest { + inner: WriteRequest, + key_items: Vec, +} + +impl DynamoDbRequest { + fn extract_pkey_values(&self) -> Option> { + let key = match (&self.inner.put_request(), &self.inner.delete_request()) { + (Some(put_req), None) => &put_req.item, + (None, Some(del_req)) => &del_req.key, + _ => return None, + }; + let vs = key + .into_iter() + .filter(|(k, _)| self.key_items.contains(k)) + .map(|(_, v)| v.clone()) + .collect(); + Some(vs) + } +} + struct DynamoDbPayloadWriter { - insert_items: HashMap, - delete_items: HashMap, + request_items: Vec, client: Client, table: String, + dynamodb_keys: Vec, } impl DynamoDbPayloadWriter { - fn write_one_insert(&mut self, key: String, value: AttributeValue) { - self.insert_items.insert(key, value); + fn write_one_insert(&mut self, item: HashMap) { + let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); + let req = WriteRequest::builder().put_request(put_req).build(); + self.write_one_req(req); } - fn write_one_delete(&mut self, key: String, value: AttributeValue) { - self.delete_items.insert(key, value); + fn write_one_delete(&mut self, key: HashMap) { + let key = key + .into_iter() + .filter(|(k, _)| self.dynamodb_keys.contains(k)) + .collect(); + let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); + let req = WriteRequest::builder().delete_request(del_req).build(); + self.write_one_req(req); } - async fn write_chunk(&mut self) -> Result<()> { - if !self.insert_items.is_empty() { - let new_items = std::mem::take(&mut self.insert_items); - self.client - .put_item() - .table_name(self.table.clone()) - .set_item(Some(new_items)) - .send() - .await - .map_err(|e| { - SinkError::DynamoDb(anyhow!(e).context("failed to put item to DynamoDB sink")) - }) - .map(|_| ())? + fn write_one_req(&mut self, req: WriteRequest) { + let r_req = DynamoDbRequest { + inner: req, + key_items: self.dynamodb_keys.clone(), + }; + if let Some(v) = r_req.extract_pkey_values() { + self.request_items.retain(|item| { + !item + .extract_pkey_values() + .unwrap_or_default() + .iter() + .all(|x| v.contains(x)) + }); } - if !self.delete_items.is_empty() { - let new_items = std::mem::take(&mut self.delete_items); + self.request_items.push(r_req); + } + + async fn write_chunk(&mut self) -> Result<()> { + if !self.request_items.is_empty() { + let table = self.table.clone(); + let req_items = std::mem::take(&mut self.request_items) + .into_iter() + .map(|r| r.inner) + .collect(); + let reqs = hashmap! { + table => req_items, + }; self.client - .delete_item() - .table_name(self.table.clone()) - .set_key(Some(new_items)) + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) .send() .await .map_err(|e| { SinkError::DynamoDb( anyhow!(e).context("failed to delete item from DynamoDB sink"), ) - }) - .map(|_| ())? + })?; } Ok(()) @@ -212,12 +241,31 @@ pub struct DynamoDbSinkWriter { impl DynamoDbSinkWriter { pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { let client = config.common.build_client().await?; + let table_name = &config.common.table; + let output = client + .describe_table() + .table_name(table_name) + .send() + .await + .map_err(|e| anyhow!(e))?; + let Some(table) = output.table else { + return Err(SinkError::DynamoDb(anyhow!( + "table {} not found", + table_name + ))); + }; + let dynamodb_keys = table + .key_schema + .unwrap_or_default() + .into_iter() + .map(|k| k.attribute_name) + .collect_vec(); let payload_writer = DynamoDbPayloadWriter { - insert_items: HashMap::new(), - delete_items: HashMap::new(), + request_items: Vec::new(), client, table: config.common.table, + dynamodb_keys, }; Ok(Self { @@ -228,17 +276,15 @@ impl DynamoDbSinkWriter { async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { + let items = self.formatter.format_row(row)?; match op { Op::Insert | Op::UpdateInsert => { - for (k, v) in self.formatter.format_row(row)? { - self.payload_writer.write_one_insert(k, v); - } + self.payload_writer.write_one_insert(items); } - Op::Delete | Op::UpdateDelete => { - for (k, v) in self.formatter.format_row(row)? { - self.payload_writer.write_one_delete(k, v); - } + Op::Delete => { + self.payload_writer.write_one_delete(items); } + Op::UpdateDelete => {} } } self.payload_writer.write_chunk().await @@ -260,7 +306,7 @@ struct DynamoDbFormatter { } impl DynamoDbFormatter { - fn format_row(&self, row: RowRef<'_>) -> Result> { + fn format_row(&self, row: RowRef<'_>) -> Result> { row.iter() .zip_eq_debug((self.schema.clone()).into_fields()) .map(|(scalar, field)| { From d4686b2d59ce5041a04fb9816448de25f7c15a45 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Thu, 16 May 2024 17:38:07 +0800 Subject: [PATCH 5/9] refactor: simplify match statement --- src/connector/src/sink/dynamodb.rs | 63 +++++++++++++----------------- 1 file changed, 28 insertions(+), 35 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 866412055bcd2..e8321a40443ea 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -323,53 +323,46 @@ fn map_data_type( let Some(scalar_ref) = scalar_ref else { return Ok(AttributeValue::Null(true)); }; - let attr = match (data_type, scalar_ref) { - (DataType::Int16, ScalarRefImpl::Int16(_)) - | (DataType::Int32, ScalarRefImpl::Int32(_)) - | (DataType::Int64, ScalarRefImpl::Int64(_)) - | (DataType::Int256, ScalarRefImpl::Int256(_)) - | (DataType::Float32, ScalarRefImpl::Float32(_)) - | (DataType::Float64, ScalarRefImpl::Float64(_)) - | (DataType::Decimal, ScalarRefImpl::Decimal(_)) - | (DataType::Serial, ScalarRefImpl::Serial(_)) => { - AttributeValue::N(scalar_ref.to_text_with_type(data_type)) - } + let attr = match data_type { + DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int256 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal + | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)), // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699) - (DataType::Varchar, ScalarRefImpl::Utf8(_)) - | (DataType::Interval, ScalarRefImpl::Interval(_)) - | (DataType::Date, ScalarRefImpl::Date(_)) - | (DataType::Time, ScalarRefImpl::Time(_)) - | (DataType::Timestamp, ScalarRefImpl::Timestamp(_)) - | (DataType::Timestamptz, ScalarRefImpl::Timestamptz(_)) - | (DataType::Jsonb, ScalarRefImpl::Jsonb(_)) => { - AttributeValue::S(scalar_ref.to_text_with_type(data_type)) - } - (DataType::Boolean, ScalarRefImpl::Bool(v)) => AttributeValue::Bool(v), - (DataType::Bytea, ScalarRefImpl::Bytea(v)) => AttributeValue::B(Blob::new(v)), - (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { - let list_attr = list_ref + DataType::Varchar + | DataType::Interval + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)), + DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()), + DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())), + DataType::List(datatype) => { + let list_attr = scalar_ref + .into_list() .iter() .map(|x| map_data_type(x, datatype)) .collect::>>()?; AttributeValue::L(list_attr) } - (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { + DataType::Struct(st) => { let mut map = HashMap::with_capacity(st.len()); - for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( - st.iter() - .map(|(name, dt)| Field::with_name(dt.clone(), name)), - ) { + for (sub_datum_ref, sub_field) in + scalar_ref.into_struct().iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) + { let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; map.insert(sub_field.name.clone(), attr); } AttributeValue::M(map) } - (data_type, scalar_ref) => { - return Err(SinkError::DynamoDb(anyhow!(format!( - "map_data_type: unsupported data type: logical type: {:?}, physical type: {:?}", - data_type, scalar_ref - ),))); - } }; Ok(attr) } From 56bf209e8c507cfbe5d2b89bea68230c3088d4dc Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Thu, 16 May 2024 17:38:33 +0800 Subject: [PATCH 6/9] refactor: de-DynamoDbCommon to AwsAuthProps --- src/connector/src/connector_common/common.rs | 43 -------------------- src/connector/src/sink/dynamodb.rs | 34 ++++++++++------ 2 files changed, 21 insertions(+), 56 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 21ee98f93b2f8..7a5e3ba5f8b2b 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -20,7 +20,6 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; -use aws_sdk_dynamodb::client::Client as DynamoDBClient; use aws_sdk_kinesis::Client as KinesisClient; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use pulsar::{Authentication, Pulsar, TokioExecutor}; @@ -705,48 +704,6 @@ impl NatsCommon { } } -#[derive(Deserialize, Debug, Clone, WithOptions)] -pub struct DynamoDbCommon { - #[serde(rename = "table", alias = "dynamodb.table")] - pub table: String, - // #[serde(rename = "primary_key", alias = "dynamodb.primary_key")] - // pub primary_key: String, - // #[serde(rename = "sort_key", alias = "dynamodb.sort_key")] - // pub sort_key: Option, - #[serde(rename = "aws.region")] - pub stream_region: String, - #[serde(rename = "aws.endpoint")] - pub endpoint: Option, - #[serde(rename = "aws.credentials.access_key_id")] - pub credentials_access_key: Option, - #[serde(rename = "aws.credentials.secret_access_key")] - pub credentials_secret_access_key: Option, - #[serde(rename = "aws.credentials.session_token")] - pub session_token: Option, - #[serde(rename = "aws.credentials.role.arn")] - pub assume_role_arn: Option, - #[serde(rename = "aws.credentials.role.external_id")] - pub assume_role_external_id: Option, -} - -impl DynamoDbCommon { - pub(crate) async fn build_client(&self) -> ConnectorResult { - let config = AwsAuthProps { - region: Some(self.stream_region.clone()), - endpoint: self.endpoint.clone(), - access_key: self.credentials_access_key.clone(), - secret_key: self.credentials_secret_access_key.clone(), - session_token: self.session_token.clone(), - arn: self.assume_role_arn.clone(), - external_id: self.assume_role_external_id.clone(), - profile: Default::default(), - }; - let aws_config = config.build_config().await?; - - Ok(DynamoDBClient::new(&aws_config)) - } -} - pub(crate) fn load_certs( certificates: &str, ) -> ConnectorResult>> { diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index e8321a40443ea..6e65ed0ac1d84 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -23,7 +23,6 @@ use dynamodb::types::{ AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, TableStatus, WriteRequest, }; -use itertools::Itertools; use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; @@ -31,7 +30,6 @@ use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; -use serde_with::serde_as; use with_options::WithOptions; use super::log_store::DeliveryFutureManagerAddFuture; @@ -39,18 +37,28 @@ use super::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, }; use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; -use crate::connector_common::common::DynamoDbCommon; +use crate::connector_common::AwsAuthProps; +use crate::error::ConnectorResult; pub const DYNAMO_DB_SINK: &str = "dynamodb"; -#[serde_as] -#[derive(Clone, Debug, Deserialize, WithOptions)] +#[derive(Deserialize, Debug, Clone, WithOptions)] pub struct DynamoDbConfig { + #[serde(rename = "table", alias = "dynamodb.table")] + pub table: String, + #[serde(flatten)] - pub common: DynamoDbCommon, + pub aws_auth_props: AwsAuthProps, } impl DynamoDbConfig { + pub async fn build_client(&self) -> ConnectorResult { + let config = &self.aws_auth_props; + let aws_config = config.build_config().await?; + + Ok(Client::new(&aws_config)) + } + fn from_hashmap(values: HashMap) -> Result { serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e))) @@ -71,11 +79,11 @@ impl Sink for DynamoDbSink { const SINK_NAME: &'static str = DYNAMO_DB_SINK; async fn validate(&self) -> Result<()> { - let client = (self.config.common.build_client().await) + let client = (self.config.build_client().await) .context("validate DynamoDB sink error") .map_err(SinkError::DynamoDb)?; - let table_name = &self.config.common.table; + let table_name = &self.config.table; let output = client .describe_table() .table_name(table_name) @@ -156,7 +164,7 @@ impl DynamoDbRequest { _ => return None, }; let vs = key - .into_iter() + .iter() .filter(|(k, _)| self.key_items.contains(k)) .map(|(_, v)| v.clone()) .collect(); @@ -240,8 +248,8 @@ pub struct DynamoDbSinkWriter { impl DynamoDbSinkWriter { pub async fn new(config: DynamoDbConfig, schema: Schema) -> Result { - let client = config.common.build_client().await?; - let table_name = &config.common.table; + let client = config.build_client().await?; + let table_name = &config.table; let output = client .describe_table() .table_name(table_name) @@ -259,12 +267,12 @@ impl DynamoDbSinkWriter { .unwrap_or_default() .into_iter() .map(|k| k.attribute_name) - .collect_vec(); + .collect(); let payload_writer = DynamoDbPayloadWriter { request_items: Vec::new(), client, - table: config.common.table, + table: config.table, dynamodb_keys, }; From 4d6b9797cdf90eb81e351ea610a3a48e3dd6c36c Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 29 May 2024 17:22:15 +0800 Subject: [PATCH 7/9] fix comm --- src/connector/src/sink/dynamodb.rs | 35 +++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 6e65ed0ac1d84..edf2e7c08cc9f 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -30,6 +30,7 @@ use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; use super::log_store::DeliveryFutureManagerAddFuture; @@ -42,15 +43,24 @@ use crate::error::ConnectorResult; pub const DYNAMO_DB_SINK: &str = "dynamodb"; +#[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct DynamoDbConfig { #[serde(rename = "table", alias = "dynamodb.table")] pub table: String, + #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_rows: usize, + #[serde(flatten)] pub aws_auth_props: AwsAuthProps, } +fn default_max_batch_rows() -> usize { + 1024 +} + impl DynamoDbConfig { pub async fn build_client(&self) -> ConnectorResult { let config = &self.aws_auth_props; @@ -102,7 +112,6 @@ impl Sink for DynamoDbSink { table_name ))); } - let pk_set: HashSet = self .schema .fields() @@ -114,7 +123,7 @@ impl Sink for DynamoDbSink { let key_schema = table.key_schema(); for key_element in key_schema.iter().map(|x| x.attribute_name()) { - if !pk_set.iter().any(|x| x == key_element) { + if !pk_set.contains(key_element) { return Err(SinkError::DynamoDb(anyhow!( "table {} key field {} not found in schema or not primary key", table_name, @@ -157,7 +166,7 @@ struct DynamoDbRequest { } impl DynamoDbRequest { - fn extract_pkey_values(&self) -> Option> { + fn extract_pk_values(&self) -> Option> { let key = match (&self.inner.put_request(), &self.inner.delete_request()) { (Some(put_req), None) => &put_req.item, (None, Some(del_req)) => &del_req.key, @@ -201,10 +210,10 @@ impl DynamoDbPayloadWriter { inner: req, key_items: self.dynamodb_keys.clone(), }; - if let Some(v) = r_req.extract_pkey_values() { + if let Some(v) = r_req.extract_pk_values() { self.request_items.retain(|item| { !item - .extract_pkey_values() + .extract_pk_values() .unwrap_or_default() .iter() .all(|x| v.contains(x)) @@ -242,6 +251,7 @@ impl DynamoDbPayloadWriter { } pub struct DynamoDbSinkWriter { + max_batch_rows: usize, payload_writer: DynamoDbPayloadWriter, formatter: DynamoDbFormatter, } @@ -277,6 +287,7 @@ impl DynamoDbSinkWriter { }; Ok(Self { + max_batch_rows: config.max_batch_rows, payload_writer, formatter: DynamoDbFormatter { schema }, }) @@ -295,6 +306,13 @@ impl DynamoDbSinkWriter { Op::UpdateDelete => {} } } + if self.payload_writer.request_items.len() >= self.max_batch_rows { + self.payload_writer.write_chunk().await?; + } + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { self.payload_writer.write_chunk().await } } @@ -307,6 +325,13 @@ impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { ) -> Result<()> { self.write_chunk_inner(chunk).await } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + self.flush().await?; + } + Ok(()) + } } struct DynamoDbFormatter { From 5e85b3882994fa990529e9fd1e706e5d1cd9ad0a Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 29 May 2024 19:23:32 +0800 Subject: [PATCH 8/9] fix ut --- src/connector/with_options_sink.yaml | 37 ++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 87bedf4b03266..eb43d51f1a759 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -139,38 +139,55 @@ DynamoDbConfig: - name: table field_type: String required: true - alias: dynamodb.table + alias: + - dynamodb.table - name: dynamodb.max_batch_rows field_type: usize required: false default: '1024' - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.profile field_type: String required: false + alias: + - profile GooglePubSubConfig: fields: - name: pubsub.project_id From 7e8f23cef980f9a95d500d6002715e15553ec4e6 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 4 Jun 2024 11:09:40 +0800 Subject: [PATCH 9/9] fix fix --- Cargo.lock | 43 +++++++++++++++++----------------------- src/connector/Cargo.toml | 2 +- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 75077d125de12..f994e1db27929 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1283,11 +1283,12 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.2.2" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75588e7ee5e8496eed939adac2035a6dbab9f7eb2acdd9ab2d31856dab6f3955" +checksum = "1ed7ef604a15fd0d4d9e43701295161ea6b504b63c44990ead352afea2bc15e9" dependencies = [ "aws-credential-types", + "aws-http", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", @@ -1295,23 +1296,21 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes", "fastrand 2.0.1", "http 0.2.9", - "http-body 0.4.5", "percent-encoding", - "pin-project-lite", "tracing", "uuid", ] [[package]] name = "aws-sdk-dynamodb" -version = "1.27.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "281c887364ff494a6ce0b492b03da5fa7729da004dbb875e2c81aceb24debe89" +checksum = "23c4ed3708df2778c0c49b16e8235e52eb8f2133ae6752c40eea1376e2563fec" dependencies = [ "aws-credential-types", + "aws-http", "aws-runtime", "aws-smithy-async", "aws-smithy-http", @@ -1323,8 +1322,7 @@ dependencies = [ "bytes", "fastrand 2.0.1", "http 0.2.9", - "once_cell", - "regex-lite", + "regex", "tracing", ] @@ -1405,9 +1403,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b56f1cbe6fd4d0c2573df72868f20ab1c125ca9c9dbce17927a463433a2e57" +checksum = "d222297ca90209dc62245f0a490355795f29de362eb5c19caea4f7f55fe69078" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -1420,7 +1418,6 @@ dependencies = [ "hex", "hmac", "http 0.2.9", - "http 1.0.0", "once_cell", "p256 0.11.1", "percent-encoding", @@ -1466,9 +1463,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "682371561562d08ab437766903c6bc28f4f95d7ab2ecfb389bda7849dd98aefe" dependencies = [ "aws-smithy-types", "bytes", @@ -1477,9 +1474,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.8" +version = "0.60.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a7de001a1b9a25601016d8057ea16e31a45fdca3751304c8edf4ad72e706c08" +checksum = "365ca49744b2bda2f1e2dc03b856da3fa5a28ca5b0a41e41d7ff5305a8fae190" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -1498,9 +1495,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.7" +version = "0.60.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +checksum = "6a46dd338dc9576d6a6a5b5a19bd678dcad018ececee11cf28ecd7588bd1a55c" dependencies = [ "aws-smithy-types", ] @@ -1517,9 +1514,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.5.0" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9ac79e9f3a4d576f3cd4a470a0275b138d9e7b11b1cd514a6858ae0a79dd5bb" +checksum = "6ab9cb6fee50680af8ceaa293ae79eba32095ca117161cb323f9ee30dd87d139" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1530,7 +1527,6 @@ dependencies = [ "h2 0.3.26", "http 0.2.9", "http-body 0.4.5", - "http-body 1.0.0", "hyper 0.14.27", "hyper-rustls 0.24.2", "once_cell", @@ -1569,10 +1565,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.9", - "http 1.0.0", "http-body 0.4.5", - "http-body 1.0.0", - "http-body-util", "hyper 0.14.27", "itoa", "num-integer", @@ -9476,7 +9469,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 12d430d23895c..211f2d608c33e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -34,7 +34,7 @@ await-tree = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } aws-msk-iam-sasl-signer = "1.0.0" -aws-sdk-dynamodb = "1.23.0" +aws-sdk-dynamodb = "1" aws-sdk-kinesis = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true }