From f40d29e0d017f0616c4daaecba8051d6958e0041 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 4 Jul 2024 11:40:01 +0800 Subject: [PATCH 1/9] support mongodb async --- src/connector/src/sink/mongodb.rs | 165 ++++++++++++------------------ 1 file changed, 67 insertions(+), 98 deletions(-) diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 8840c72176960..2edc807a15118 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -17,9 +17,13 @@ use std::ops::Deref; use std::sync::LazyLock; use anyhow::anyhow; +use futures::future::try_join_all; +use futures::prelude::future::FutureExt; +use futures::prelude::TryFuture; +use futures::TryFutureExt; use itertools::Itertools; use mongodb::bson::{bson, doc, Array, Bson, Document}; -use mongodb::{Client, Namespace}; +use mongodb::{Client, Database, Namespace}; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; @@ -28,36 +32,31 @@ use risingwave_common::row::Row; use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::ScalarRefImpl; use serde_derive::Deserialize; -use serde_with::{serde_as, DisplayFromStr}; +use serde_with::serde_as; use thiserror_ext::AsReport; -use tonic::async_trait; use with_options::WithOptions; use super::catalog::desc::SinkDesc; use super::encoder::BsonEncoder; +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::connector_common::MongodbCommon; use crate::deserialize_bool_from_string; use crate::sink::encoder::RowEncoder; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; pub const MONGODB_SINK: &str = "mongodb"; +const MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; -// 65536 seems like a reasonable limit, but we may consider setting this limit to 100,000, -// which is the actual limit imposed by the server. -// see https://www.mongodb.com/docs/v4.2/reference/command/hello/#hello.maxWriteBatchSize for more details -pub const MONGODB_BULK_WRITE_SIZE_LIMIT: usize = 65536; pub const MONGODB_PK_NAME: &str = "_id"; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); -const fn _default_bulk_write_max_entries() -> usize { - 1024 -} - #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct MongodbConfig { @@ -81,14 +80,6 @@ pub struct MongodbConfig { rename = "collection.name.field.drop" )] pub drop_collection_name_field: bool, - - /// The maximum entries will accumulate before performing the bulk write, defaults to 1024. - #[serde( - rename = "mongodb.bulk_write.max_entries", - default = "_default_bulk_write_max_entries" - )] - #[serde_as(as = "DisplayFromStr")] - pub bulk_write_max_entries: usize, } impl MongodbConfig { @@ -181,7 +172,7 @@ impl TryFrom for MongodbSink { impl Sink for MongodbSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = MONGODB_SINK; @@ -234,14 +225,6 @@ impl Sink for MongodbSink { } } - if self.config.bulk_write_max_entries > MONGODB_BULK_WRITE_SIZE_LIMIT { - return Err(SinkError::Config(anyhow!( - "mongodb.bulk_write.max_entries {} exceeds the limit {}", - self.config.bulk_write_max_entries, - MONGODB_BULK_WRITE_SIZE_LIMIT - ))); - } - if let Err(err) = self.config.common.collection_name.parse::() { return Err(SinkError::Config(anyhow!(err).context(format!( "invalid collection.name {}", @@ -312,7 +295,7 @@ impl Sink for MongodbSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE)) } } @@ -378,7 +361,6 @@ impl MongodbSinkWriter { default_namespace, coll_name_field_index, ClientGuard::new(name, client), - config.bulk_write_max_entries, row_encoder, ); @@ -390,7 +372,7 @@ impl MongodbSinkWriter { }) } - async fn append(&mut self, chunk: StreamChunk) -> Result<()> { + fn append(&mut self, chunk: StreamChunk) -> Result<()> { let insert_builder = must_match!(&mut self.command_builder, CommandBuilder::AppendOnly(builder) => builder); for (op, row) in chunk.rows() { @@ -405,12 +387,12 @@ impl MongodbSinkWriter { } continue; } - self.payload_writer.append(insert_builder, row).await?; + self.payload_writer.append(insert_builder, row)?; } Ok(()) } - async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { let upsert_builder = must_match!(&mut self.command_builder, CommandBuilder::Upsert(builder) => builder); for (op, row) in chunk.rows() { @@ -418,36 +400,38 @@ impl MongodbSinkWriter { // we should ignore the `UpdateDelete` in upsert mode continue; } - self.payload_writer.upsert(upsert_builder, op, row).await?; + self.payload_writer.upsert(upsert_builder, op, row)?; } Ok(()) } } -#[async_trait] -impl SinkWriter for MongodbSinkWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } +pub type MongodbSinkDeliveryFuture = impl TryFuture + Unpin + 'static; - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for MongodbSinkWriter { + type DeliveryFuture = MongodbSinkDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { if self.is_append_only { - self.append(chunk).await + self.append(chunk)?; + let insert_builder = must_match!(&mut self.command_builder, CommandBuilder::AppendOnly(builder) => builder); + let futures = self.payload_writer.flush_insert(insert_builder)?; + add_future + .add_future_may_await(try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed()) + .await?; } else { - self.upsert(chunk).await - } - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - if self.is_append_only { - let insert_builder = must_match!(&mut self.command_builder, CommandBuilder::AppendOnly(builder) => builder); - self.payload_writer.flush_insert(insert_builder).await?; - } else { - let upsert_builder = must_match!(&mut self.command_builder, CommandBuilder::Upsert(builder) => builder); - self.payload_writer.flush_upsert(upsert_builder).await?; - } - } + self.upsert(chunk)?; + let upsert_builder = + must_match!(&mut self.command_builder, CommandBuilder::Upsert(builder) => builder); + let futures = self.payload_writer.flush_upsert(upsert_builder)?; + add_future + .add_future_may_await(try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed()) + .await?; + }; Ok(()) } } @@ -458,10 +442,10 @@ struct InsertCommandBuilder { } impl InsertCommandBuilder { - fn new(coll: String, capacity: usize) -> Self { + fn new(coll: String) -> Self { Self { coll, - inserts: Array::with_capacity(capacity), + inserts: Array::new(), } } @@ -485,11 +469,11 @@ struct UpsertCommandBuilder { } impl UpsertCommandBuilder { - fn new(coll: String, capacity: usize) -> Self { + fn new(coll: String) -> Self { Self { coll, - updates: Array::with_capacity(capacity), - deletes: HashMap::with_capacity(capacity), + updates: Array::new(), + deletes: HashMap::new(), } } @@ -566,8 +550,6 @@ struct MongodbPayloadWriter { default_namespace: Namespace, coll_name_field_index: Option, client: ClientGuard, - buffered_entries: usize, - max_entries: usize, row_encoder: BsonEncoder, } @@ -578,7 +560,6 @@ impl MongodbPayloadWriter { default_namespace: Namespace, coll_name_field_index: Option, client: ClientGuard, - max_entries: usize, row_encoder: BsonEncoder, ) -> Self { Self { @@ -587,8 +568,6 @@ impl MongodbPayloadWriter { default_namespace, coll_name_field_index, client, - buffered_entries: 0, - max_entries, row_encoder, } } @@ -630,7 +609,7 @@ impl MongodbPayloadWriter { } } - async fn append( + fn append( &mut self, insert_builder: &mut HashMap, row: RowRef<'_>, @@ -641,17 +620,12 @@ impl MongodbPayloadWriter { insert_builder .entry(ns) - .or_insert_with(|| InsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| InsertCommandBuilder::new(coll)) .append(document); - - self.buffered_entries += 1; - if self.buffered_entries >= self.max_entries { - self.flush_insert(insert_builder).await?; - } Ok(()) } - async fn upsert( + fn upsert( &mut self, upsert_builder: &mut HashMap, op: Op, @@ -675,66 +649,61 @@ impl MongodbPayloadWriter { match op { Op::Insert | Op::UpdateInsert => upsert_builder .entry(ns) - .or_insert_with(|| UpsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| UpsertCommandBuilder::new(coll)) .add_upsert(pk, document)?, Op::UpdateDelete => (), Op::Delete => upsert_builder .entry(ns) - .or_insert_with(|| UpsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| UpsertCommandBuilder::new(coll)) .add_delete(pk)?, } - - self.buffered_entries += 1; - if self.buffered_entries >= self.max_entries { - self.flush_upsert(upsert_builder).await?; - } Ok(()) } - async fn flush_insert( - &mut self, + fn flush_insert( + &self, insert_builder: &mut HashMap, - ) -> Result<()> { + ) -> Result>>> { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. + let mut futures = Vec::with_capacity(insert_builder.len()); for (ns, builder) in insert_builder.drain() { - self.send_bulk_write_command(&ns.0, builder.build()).await?; + let db = self.client.database(&ns.0); + futures.push(Self::send_bulk_write_command(db, builder.build())); } - self.buffered_entries = 0; - Ok(()) + Ok(futures) } - async fn flush_upsert( - &mut self, + fn flush_upsert( + &self, upsert_builder: &mut HashMap, - ) -> Result<()> { + ) -> Result>>> { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. + let mut futures = Vec::with_capacity(upsert_builder.len()); for (ns, builder) in upsert_builder.drain() { let (upsert, delete) = builder.build(); // we are sending the bulk upsert first because, under same pk, the `Insert` and `UpdateInsert` // should always appear before `Delete`. we have already ignored the `UpdateDelete` // which is useless in upsert mode. + let db = self.client.database(&ns.0); if let Some(upsert) = upsert { - self.send_bulk_write_command(&ns.0, upsert).await?; + futures.push(Self::send_bulk_write_command(db.clone(), upsert)); } if let Some(delete) = delete { - self.send_bulk_write_command(&ns.0, delete).await?; + futures.push(Self::send_bulk_write_command(db, delete)); } } - self.buffered_entries = 0; - Ok(()) + Ok(futures) } - async fn send_bulk_write_command(&self, database: &str, command: Document) -> Result<()> { - let db = self.client.database(database); - + async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { let result = db.run_command(command, None).await.map_err(|err| { SinkError::Mongodb(anyhow!(err).context(format!( "sending bulk write command failed, database: {}", - database + db.name() ))) })?; From 7b4cf7d32528e44571955c3c11ae4bfe5a25c065 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 4 Jul 2024 15:07:38 +0800 Subject: [PATCH 2/9] support dynamodb --- src/connector/src/sink/dynamodb.rs | 100 ++++++++++++++--------------- src/connector/src/sink/mongodb.rs | 2 +- 2 files changed, 49 insertions(+), 53 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 35b48c6e31faf..fcecd58a15929 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -12,16 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::result; use std::collections::{BTreeMap, HashMap, HashSet}; use anyhow::{anyhow, Context}; use aws_sdk_dynamodb as dynamodb; use aws_sdk_dynamodb::client::Client; +use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use aws_smithy_types::Blob; +use dynamodb::error::SdkError; +use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput}; use dynamodb::types::{ AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, TableStatus, WriteRequest, }; +use futures::prelude::future::{FutureExt, TryFutureExt}; +use futures::prelude::{Future, TryFuture}; use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; @@ -29,7 +35,7 @@ use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; -use serde_with::{serde_as, DisplayFromStr}; +use serde_with::{serde_as}; use with_options::WithOptions; use super::log_store::DeliveryFutureManagerAddFuture; @@ -48,18 +54,10 @@ pub struct DynamoDbConfig { #[serde(rename = "table", alias = "dynamodb.table")] pub table: String, - #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] - #[serde_as(as = "DisplayFromStr")] - pub max_batch_rows: usize, - #[serde(flatten)] pub aws_auth_props: AwsAuthProps, } -fn default_max_batch_rows() -> usize { - 1024 -} - impl DynamoDbConfig { pub async fn build_client(&self) -> ConnectorResult { let config = &self.aws_auth_props; @@ -221,36 +219,32 @@ impl DynamoDbPayloadWriter { self.request_items.push(r_req); } - async fn write_chunk(&mut self) -> Result<()> { - if !self.request_items.is_empty() { - let table = self.table.clone(); - let req_items = std::mem::take(&mut self.request_items) - .into_iter() - .map(|r| r.inner) - .collect(); - let reqs = hashmap! { - table => req_items, - }; - self.client - .batch_write_item() - .set_request_items(Some(reqs)) - .return_consumed_capacity(ReturnConsumedCapacity::None) - .return_item_collection_metrics(ReturnItemCollectionMetrics::None) - .send() - .await - .map_err(|e| { - SinkError::DynamoDb( - anyhow!(e).context("failed to delete item from DynamoDB sink"), - ) - })?; - } - - Ok(()) + async fn write_chunk(&mut self) -> Result>>> { + let table = self.table.clone(); + let req_items = std::mem::take(&mut self.request_items) + .into_iter() + .map(|r| r.inner) + .collect(); + let reqs = hashmap! { + table => req_items, + }; + let future = self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send(); + // .map_err(|e| { + // SinkError::DynamoDb( + // anyhow!(e).context("failed to delete item from DynamoDB sink"), + // ) + // })?; + + Ok(future) } } pub struct DynamoDbSinkWriter { - max_batch_rows: usize, payload_writer: DynamoDbPayloadWriter, formatter: DynamoDbFormatter, } @@ -286,13 +280,12 @@ impl DynamoDbSinkWriter { }; Ok(Self { - max_batch_rows: config.max_batch_rows, payload_writer, formatter: DynamoDbFormatter { schema }, }) } - async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { + async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result>>> { for (op, row) in chunk.rows() { let items = self.formatter.format_row(row)?; match op { @@ -305,30 +298,33 @@ impl DynamoDbSinkWriter { Op::UpdateDelete => {} } } - if self.payload_writer.request_items.len() >= self.max_batch_rows { - self.payload_writer.write_chunk().await?; - } - Ok(()) + Ok(self.payload_writer.write_chunk().await?) } - async fn flush(&mut self) -> Result<()> { - self.payload_writer.write_chunk().await - } } +pub type DynamoDBSinkDeliveryFuture = + impl TryFuture + Unpin + 'static; + impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { + type DeliveryFuture = DynamoDBSinkDeliveryFuture; async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - self.write_chunk_inner(chunk).await - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.flush().await?; - } + let future = self.write_chunk_inner(chunk).await?; + add_future.add_future_may_await( + future.map(|result| { + result + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + }) + .map(|_| ()) + }).map_ok(|_| ()).boxed() + ).await?; Ok(()) } } diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 2edc807a15118..2fccd03f67f35 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -51,7 +51,7 @@ use crate::sink::{ }; pub const MONGODB_SINK: &str = "mongodb"; -const MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 256; +const MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 4096; pub const MONGODB_PK_NAME: &str = "_id"; From 29d316d8bea372c7602fe5aa87f38f584fb3e823 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 10 Jul 2024 15:39:31 +0800 Subject: [PATCH 3/9] support --- src/connector/src/sink/dynamodb.rs | 134 ++++++++++++++++++++--------- 1 file changed, 95 insertions(+), 39 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index fcecd58a15929..dcd72f0204f19 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -26,8 +26,9 @@ use dynamodb::types::{ AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, TableStatus, WriteRequest, }; -use futures::prelude::future::{FutureExt, TryFutureExt}; +use futures::prelude::future::{try_join_all, FutureExt, TryFutureExt}; use futures::prelude::{Future, TryFuture}; +use itertools::Itertools; use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; @@ -35,7 +36,7 @@ use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; -use serde_with::{serde_as}; +use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; use super::log_store::DeliveryFutureManagerAddFuture; @@ -56,6 +57,28 @@ pub struct DynamoDbConfig { #[serde(flatten)] pub aws_auth_props: AwsAuthProps, + + #[serde( + rename = "dynamodb.max_batch_item_nums", + default = "default_max_batch_item_nums" + )] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_item_nums: usize, + + #[serde( + rename = "dynamodb.max_future_send_nums", + default = "default_max_future_send_nums" + )] + #[serde_as(as = "DisplayFromStr")] + pub max_future_send_nums: usize, +} + +fn default_max_batch_item_nums() -> usize { + 25 +} + +fn default_max_future_send_nums() -> usize { + 256 } impl DynamoDbConfig { @@ -136,7 +159,7 @@ impl Sink for DynamoDbSink { Ok( DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone()) .await? - .into_log_sinker(usize::MAX), + .into_log_sinker(self.config.max_future_send_nums), ) } } @@ -183,6 +206,7 @@ struct DynamoDbPayloadWriter { client: Client, table: String, dynamodb_keys: Vec, + max_batch_item_nums: usize, } impl DynamoDbPayloadWriter { @@ -219,28 +243,42 @@ impl DynamoDbPayloadWriter { self.request_items.push(r_req); } - async fn write_chunk(&mut self) -> Result>>> { + fn write_chunk( + &mut self, + ) -> Result< + Vec< + impl Future< + Output = result::Result< + BatchWriteItemOutput, + SdkError, + >, + >, + >, + > { let table = self.table.clone(); - let req_items = std::mem::take(&mut self.request_items) + let req_items: Vec> = std::mem::take(&mut self.request_items) .into_iter() .map(|r| r.inner) + .chunks(self.max_batch_item_nums) + .into_iter() + .map(|chunk| chunk.collect()) .collect(); - let reqs = hashmap! { - table => req_items, - }; - let future = self.client - .batch_write_item() - .set_request_items(Some(reqs)) - .return_consumed_capacity(ReturnConsumedCapacity::None) - .return_item_collection_metrics(ReturnItemCollectionMetrics::None) - .send(); - // .map_err(|e| { - // SinkError::DynamoDb( - // anyhow!(e).context("failed to delete item from DynamoDB sink"), - // ) - // })?; - - Ok(future) + let futures: Vec<_> = req_items + .into_iter() + .map(|req_items| { + let reqs = hashmap! { + table.clone() => req_items, + }; + self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send() + }) + .collect(); + + Ok(futures) } } @@ -275,8 +313,9 @@ impl DynamoDbSinkWriter { let payload_writer = DynamoDbPayloadWriter { request_items: Vec::new(), client, - table: config.table, + table: config.table.clone(), dynamodb_keys, + max_batch_item_nums: config.max_batch_item_nums, }; Ok(Self { @@ -285,7 +324,19 @@ impl DynamoDbSinkWriter { }) } - async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result>>> { + fn write_chunk_inner( + &mut self, + chunk: StreamChunk, + ) -> Result< + Vec< + impl Future< + Output = result::Result< + BatchWriteItemOutput, + SdkError, + >, + >, + >, + > { for (op, row) in chunk.rows() { let items = self.formatter.format_row(row)?; match op { @@ -298,33 +349,38 @@ impl DynamoDbSinkWriter { Op::UpdateDelete => {} } } - Ok(self.payload_writer.write_chunk().await?) + self.payload_writer.write_chunk() } - } -pub type DynamoDBSinkDeliveryFuture = - impl TryFuture + Unpin + 'static; +pub type DynamoDbSinkDeliveryFuture = impl TryFuture + Unpin + 'static; impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { - type DeliveryFuture = DynamoDBSinkDeliveryFuture; + type DeliveryFuture = DynamoDbSinkDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - let future = self.write_chunk_inner(chunk).await?; - add_future.add_future_may_await( - future.map(|result| { - result - .map_err(|e| { - SinkError::DynamoDb( - anyhow!(e).context("failed to delete item from DynamoDB sink"), - ) + let futures = self.write_chunk_inner(chunk)?; + add_future + .add_future_may_await( + try_join_all(futures.into_iter().map(|future| { + future.map(|result| { + result + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + }) + .map(|_| ()) }) - .map(|_| ()) - }).map_ok(|_| ()).boxed() - ).await?; + })) + .map_ok(|_: Vec<()>| ()) + .boxed(), + ) + .await?; Ok(()) } } From 5d57642249f98896699655c3d93a0a5270392a38 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 10 Jul 2024 16:35:29 +0800 Subject: [PATCH 4/9] add sink_douple --- src/connector/src/sink/dynamodb.rs | 9 +++++++++ src/connector/src/sink/mongodb.rs | 5 ++--- src/connector/src/sink/redis.rs | 9 +++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index dcd72f0204f19..dbdd89da4d661 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -33,12 +33,14 @@ use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row as _; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; +use super::catalog::desc::SinkDesc; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, @@ -108,6 +110,13 @@ impl Sink for DynamoDbSink { const SINK_NAME: &'static str = DYNAMO_DB_SINK; + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + match user_specified { + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), + } + } + async fn validate(&self) -> Result<()> { let client = (self.config.build_client().await) .context("validate DynamoDB sink error") diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 2fccd03f67f35..cbd2a94bf82a6 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -178,9 +178,8 @@ impl Sink for MongodbSink { fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - // Set default sink decouple to false, because mongodb sink writer only ensure delivery on checkpoint barrier - SinkDecouple::Default | SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 9d6a33d5131a4..635724bb996f1 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -21,11 +21,13 @@ use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline}; use redis::{Client as RedisClient, Pipeline}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_json::Value; use serde_with::serde_as; use with_options::WithOptions; +use super::catalog::desc::SinkDesc; use super::catalog::SinkFormatDesc; use super::encoder::template::TemplateEncoder; use super::formatter::SinkFormatterImpl; @@ -198,6 +200,13 @@ impl Sink for RedisSink { const SINK_NAME: &'static str = "redis"; + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + match user_specified { + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), + } + } + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(RedisSinkWriter::new( self.config.clone(), From 14112af38c277f9ef34c0b55f1e8b6948267b3fc Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 10 Jul 2024 16:51:39 +0800 Subject: [PATCH 5/9] fix ci --- src/connector/with_options_sink.yaml | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 27362900dd054..8997e46cfaf38 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -161,10 +161,6 @@ DynamoDbConfig: required: true alias: - dynamodb.table - - name: dynamodb.max_batch_rows - field_type: usize - required: false - default: '1024' - name: aws.region field_type: String required: false @@ -208,6 +204,14 @@ DynamoDbConfig: required: false alias: - profile + - name: dynamodb.max_batch_item_nums + field_type: usize + required: false + default: '25' + - name: dynamodb.max_future_send_nums + field_type: usize + required: false + default: '256' GooglePubSubConfig: fields: - name: pubsub.project_id @@ -552,11 +556,6 @@ MongodbConfig: comments: Controls whether the field value of `collection.name.field` should be dropped when sinking. Set this option to true to avoid the duplicate values of `collection.name.field` being written to the result collection. required: false default: Default::default - - name: mongodb.bulk_write.max_entries - field_type: usize - comments: The maximum entries will accumulate before performing the bulk write, defaults to 1024. - required: false - default: '1024' MqttConfig: fields: - name: url From f6125fc887e268107781c615777b26a3a101fb18 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 10 Sep 2024 16:39:57 +0800 Subject: [PATCH 6/9] fix comm --- src/connector/src/sink/dynamodb.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index da118b713c3d9..ebe862992db66 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -55,6 +55,11 @@ pub struct DynamoDbConfig { #[serde(rename = "table", alias = "dynamodb.table")] pub table: String, + #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + #[deprecated] + pub max_batch_rows: usize, + #[serde(flatten)] pub aws_auth_props: AwsAuthProps, @@ -81,6 +86,10 @@ fn default_max_future_send_nums() -> usize { 256 } +fn default_max_batch_rows() -> usize { + 1024 +} + impl DynamoDbConfig { pub async fn build_client(&self) -> ConnectorResult { let config = &self.aws_auth_props; From 360bdc39e4bd722b89f308debf2e278342bafef9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 23 Sep 2024 15:23:55 +0800 Subject: [PATCH 7/9] fix ut --- src/connector/with_options_sink.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 8a79d8838945f..117db9aa2a0bb 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -177,6 +177,10 @@ DynamoDbConfig: required: true alias: - dynamodb.table + - name: dynamodb.max_batch_rows + field_type: usize + required: false + default: '1024' - name: aws.region field_type: String required: false @@ -671,6 +675,11 @@ MongodbConfig: result collection. required: false default: Default::default + - name: mongodb.bulk_write.max_entries + field_type: usize + comments: The maximum entries will accumulate before performing the bulk write, defaults to 1024. + required: false + default: '1024' MqttConfig: fields: - name: url From e8333ddb6c64172720fc4882d52ecbc0d7c87863 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 26 Sep 2024 18:32:51 +0800 Subject: [PATCH 8/9] fix comm fmt --- src/connector/src/sink/mongodb.rs | 155 ++++++++++++++++-------------- 1 file changed, 84 insertions(+), 71 deletions(-) diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 13e1a9366498d..244a1bb70db8d 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::future::Future; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use std::sync::LazyLock; use anyhow::anyhow; -use futures::future::try_join_all; -use futures::prelude::future::FutureExt; +use futures::future::{try_join_all, TryJoinAll}; use futures::prelude::TryFuture; use futures::TryFutureExt; use itertools::Itertools; use mongodb::bson::{bson, doc, Array, Bson, Document}; -use mongodb::{Client, Database, Namespace}; +use mongodb::{Client, Namespace}; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; @@ -48,6 +46,64 @@ use crate::sink::{ SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; +mod send_bulk_write_command_future { + use core::future::Future; + + use anyhow::anyhow; + use mongodb::bson::Document; + use mongodb::Database; + + use crate::sink::{Result, SinkError}; + + pub(super) type SendBulkWriteCommandFuture = impl Future> + 'static; + + pub(super) fn send_bulk_write_commands( + db: Database, + upsert: Option, + delete: Option, + ) -> SendBulkWriteCommandFuture { + async move { + if let Some(upsert) = upsert { + send_bulk_write_command(db.clone(), upsert).await?; + } + if let Some(delete) = delete { + send_bulk_write_command(db, delete).await?; + } + Ok(()) + } + } + + async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { + let result = db.run_command(command, None).await.map_err(|err| { + SinkError::Mongodb(anyhow!(err).context(format!( + "sending bulk write command failed, database: {}", + db.name() + ))) + })?; + + if let Ok(write_errors) = result.get_array("writeErrors") { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with write errors: {:?}", + write_errors, + ))); + } + + let n = result.get_i32("n").map_err(|err| { + SinkError::Mongodb( + anyhow!(err).context("can't extract field n from bulk write response"), + ) + })?; + if n < 1 { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with an abnormal state, n = {}", + n + ))); + } + + Ok(()) + } +} + pub const MONGODB_SINK: &str = "mongodb"; const MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 4096; @@ -301,12 +357,12 @@ impl Sink for MongodbSink { } } +use send_bulk_write_command_future::*; + pub struct MongodbSinkWriter { pub config: MongodbConfig, payload_writer: MongodbPayloadWriter, is_append_only: bool, - // // TODO switching to bulk write API when mongodb driver supports it - // command_builder: CommandBuilder, } impl MongodbSinkWriter { @@ -351,12 +407,6 @@ impl MongodbSinkWriter { let row_encoder = BsonEncoder::new(schema.clone(), Some(col_indices), pk_indices.clone()); - // let command_builder = if is_append_only { - // CommandBuilder::AppendOnly(HashMap::new()) - // } else { - // CommandBuilder::Upsert(HashMap::new()) - // }; - let payload_writer = MongodbPayloadWriter::new( schema, pk_indices, @@ -373,7 +423,7 @@ impl MongodbSinkWriter { }) } - fn append(&mut self, chunk: StreamChunk) -> Result>>> { + fn append(&mut self, chunk: StreamChunk) -> Result> { let mut insert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op != Op::Insert { @@ -389,10 +439,10 @@ impl MongodbSinkWriter { } self.payload_writer.append(&mut insert_builder, row)?; } - self.payload_writer.flush_insert(&mut insert_builder) + Ok(self.payload_writer.flush_insert(insert_builder)) } - fn upsert(&mut self, chunk: StreamChunk) -> Result>>> { + fn upsert(&mut self, chunk: StreamChunk) -> Result> { let mut upsert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op == Op::UpdateDelete { @@ -401,7 +451,7 @@ impl MongodbSinkWriter { } self.payload_writer.upsert(&mut upsert_builder, op, row)?; } - self.payload_writer.flush_upsert(&mut upsert_builder) + Ok(self.payload_writer.flush_upsert(upsert_builder)) } } @@ -415,14 +465,14 @@ impl AsyncTruncateSinkWriter for MongodbSinkWriter { chunk: StreamChunk, mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - let boxed_futures = if self.is_append_only { - let futures = self.append(chunk)?; - try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed() + let futures = if self.is_append_only { + self.append(chunk)? } else { - let futures = self.upsert(chunk)?; - try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed() + self.upsert(chunk)? }; - add_future.add_future_may_await(boxed_futures).await?; + add_future + .add_future_may_await(futures.map_ok(|_: Vec<()>| ())) + .await?; Ok(()) } } @@ -648,70 +698,33 @@ impl MongodbPayloadWriter { fn flush_insert( &self, - insert_builder: &mut HashMap, - ) -> Result>>> { + insert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - let mut futures = Vec::with_capacity(insert_builder.len()); - for (ns, builder) in insert_builder.drain() { + let futures = insert_builder.into_iter().map(|(ns, builder)| { let db = self.client.database(&ns.0); - futures.push(Self::send_bulk_write_command(db, builder.build())); - } - Ok(futures) + send_bulk_write_commands(db, Some(builder.build()), None) + }); + try_join_all(futures) } fn flush_upsert( &self, - upsert_builder: &mut HashMap, - ) -> Result>>> { + upsert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - let mut futures = Vec::with_capacity(upsert_builder.len()); - for (ns, builder) in upsert_builder.drain() { + let futures = upsert_builder.into_iter().map(|(ns, builder)| { let (upsert, delete) = builder.build(); // we are sending the bulk upsert first because, under same pk, the `Insert` and `UpdateInsert` // should always appear before `Delete`. we have already ignored the `UpdateDelete` // which is useless in upsert mode. let db = self.client.database(&ns.0); - if let Some(upsert) = upsert { - futures.push(Self::send_bulk_write_command(db.clone(), upsert)); - } - if let Some(delete) = delete { - futures.push(Self::send_bulk_write_command(db, delete)); - } - } - Ok(futures) - } - - async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { - let result = db.run_command(command, None).await.map_err(|err| { - SinkError::Mongodb(anyhow!(err).context(format!( - "sending bulk write command failed, database: {}", - db.name() - ))) - })?; - - if let Ok(write_errors) = result.get_array("writeErrors") { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with write errors: {:?}", - write_errors, - ))); - } - - let n = result.get_i32("n").map_err(|err| { - SinkError::Mongodb( - anyhow!(err).context("can't extract field n from bulk write response"), - ) - })?; - if n < 1 { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with an abnormal state, n = {}", - n - ))); - } - - Ok(()) + send_bulk_write_commands(db, upsert, delete) + }); + try_join_all(futures) } } From f545f732d871d3f8c698ad0b623eecc3734b7c8a Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 27 Sep 2024 18:39:03 +0800 Subject: [PATCH 9/9] fix comm --- src/connector/src/sink/dynamodb.rs | 257 ++++++++++++++--------------- 1 file changed, 127 insertions(+), 130 deletions(-) diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index dedb55457ee18..825c2fc3d7a14 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -12,24 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::result; use std::collections::{BTreeMap, HashMap, HashSet}; use anyhow::{anyhow, Context}; use aws_sdk_dynamodb as dynamodb; use aws_sdk_dynamodb::client::Client; -use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use aws_smithy_types::Blob; -use dynamodb::error::SdkError; -use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput}; -use dynamodb::types::{ - AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, - TableStatus, WriteRequest, -}; -use futures::prelude::future::{try_join_all, FutureExt, TryFutureExt}; -use futures::prelude::{Future, TryFuture}; -use itertools::Itertools; -use maplit::hashmap; +use dynamodb::types::{AttributeValue, TableStatus, WriteRequest}; +use futures::prelude::future::TryFutureExt; +use futures::prelude::TryFuture; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row as _; @@ -38,6 +29,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; +use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture}; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ @@ -213,95 +205,6 @@ impl DynamoDbRequest { } } -struct DynamoDbPayloadWriter { - client: Client, - table: String, - dynamodb_keys: Vec, - max_batch_item_nums: usize, -} - -impl DynamoDbPayloadWriter { - fn write_one_insert( - &mut self, - item: HashMap, - request_items: &mut Vec, - ) { - let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); - let req = WriteRequest::builder().put_request(put_req).build(); - self.write_one_req(req, request_items); - } - - fn write_one_delete( - &mut self, - key: HashMap, - request_items: &mut Vec, - ) { - let key = key - .into_iter() - .filter(|(k, _)| self.dynamodb_keys.contains(k)) - .collect(); - let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); - let req = WriteRequest::builder().delete_request(del_req).build(); - self.write_one_req(req, request_items); - } - - fn write_one_req(&mut self, req: WriteRequest, request_items: &mut Vec) { - let r_req = DynamoDbRequest { - inner: req, - key_items: self.dynamodb_keys.clone(), - }; - if let Some(v) = r_req.extract_pk_values() { - request_items.retain(|item| { - !item - .extract_pk_values() - .unwrap_or_default() - .iter() - .all(|x| v.contains(x)) - }); - } - request_items.push(r_req); - } - - fn write_chunk( - &mut self, - request_items: Vec, - ) -> Result< - Vec< - impl Future< - Output = result::Result< - BatchWriteItemOutput, - SdkError, - >, - >, - >, - > { - let table = self.table.clone(); - let req_items: Vec> = request_items - .into_iter() - .map(|r| r.inner) - .chunks(self.max_batch_item_nums) - .into_iter() - .map(|chunk| chunk.collect()) - .collect(); - let futures: Vec<_> = req_items - .into_iter() - .map(|req_items| { - let reqs = hashmap! { - table.clone() => req_items, - }; - self.client - .batch_write_item() - .set_request_items(Some(reqs)) - .return_consumed_capacity(ReturnConsumedCapacity::None) - .return_item_collection_metrics(ReturnItemCollectionMetrics::None) - .send() - }) - .collect(); - - Ok(futures) - } -} - pub struct DynamoDbSinkWriter { payload_writer: DynamoDbPayloadWriter, formatter: DynamoDbFormatter, @@ -343,19 +246,7 @@ impl DynamoDbSinkWriter { }) } - fn write_chunk_inner( - &mut self, - chunk: StreamChunk, - ) -> Result< - Vec< - impl Future< - Output = result::Result< - BatchWriteItemOutput, - SdkError, - >, - >, - >, - > { + fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result { let mut request_items = Vec::new(); for (op, row) in chunk.rows() { let items = self.formatter.format_row(row)?; @@ -371,7 +262,7 @@ impl DynamoDbSinkWriter { Op::UpdateDelete => {} } } - self.payload_writer.write_chunk(request_items) + Ok(self.payload_writer.write_chunk(request_items)) } } @@ -387,21 +278,7 @@ impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { ) -> Result<()> { let futures = self.write_chunk_inner(chunk)?; add_future - .add_future_may_await( - try_join_all(futures.into_iter().map(|future| { - future.map(|result| { - result - .map_err(|e| { - SinkError::DynamoDb( - anyhow!(e).context("failed to delete item from DynamoDB sink"), - ) - }) - .map(|_| ()) - }) - })) - .map_ok(|_: Vec<()>| ()) - .boxed(), - ) + .add_future_may_await(futures.map_ok(|_: Vec<()>| ())) .await?; Ok(()) } @@ -471,3 +348,123 @@ fn map_data(scalar_ref: Option>, data_type: &DataType) -> Resu }; Ok(attr) } + +mod write_chunk_future { + use core::result; + use std::collections::HashMap; + + use anyhow::anyhow; + use aws_sdk_dynamodb as dynamodb; + use aws_sdk_dynamodb::client::Client; + use aws_smithy_runtime_api::client::orchestrator::HttpResponse; + use dynamodb::error::SdkError; + use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput}; + use dynamodb::types::{ + AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, + ReturnItemCollectionMetrics, WriteRequest, + }; + use futures::future::{Map, TryJoinAll}; + use futures::prelude::future::{try_join_all, FutureExt}; + use futures::prelude::Future; + use itertools::Itertools; + use maplit::hashmap; + + use super::{DynamoDbRequest, Result, SinkError}; + + pub type WriteChunkFuture = TryJoinAll< + Map< + impl Future< + Output = result::Result< + BatchWriteItemOutput, + SdkError, + >, + >, + impl FnOnce( + result::Result>, + ) -> Result<()>, + >, + >; + pub struct DynamoDbPayloadWriter { + pub client: Client, + pub table: String, + pub dynamodb_keys: Vec, + pub max_batch_item_nums: usize, + } + + impl DynamoDbPayloadWriter { + pub fn write_one_insert( + &mut self, + item: HashMap, + request_items: &mut Vec, + ) { + let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); + let req = WriteRequest::builder().put_request(put_req).build(); + self.write_one_req(req, request_items); + } + + pub fn write_one_delete( + &mut self, + key: HashMap, + request_items: &mut Vec, + ) { + let key = key + .into_iter() + .filter(|(k, _)| self.dynamodb_keys.contains(k)) + .collect(); + let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); + let req = WriteRequest::builder().delete_request(del_req).build(); + self.write_one_req(req, request_items); + } + + pub fn write_one_req( + &mut self, + req: WriteRequest, + request_items: &mut Vec, + ) { + let r_req = DynamoDbRequest { + inner: req, + key_items: self.dynamodb_keys.clone(), + }; + if let Some(v) = r_req.extract_pk_values() { + request_items.retain(|item| { + !item + .extract_pk_values() + .unwrap_or_default() + .iter() + .all(|x| v.contains(x)) + }); + } + request_items.push(r_req); + } + + pub fn write_chunk(&mut self, request_items: Vec) -> WriteChunkFuture { + let table = self.table.clone(); + let chunks = request_items + .into_iter() + .map(|r| r.inner) + .chunks(self.max_batch_item_nums); + let futures = chunks.into_iter().map(|chunk| { + let req_items = chunk.collect(); + let reqs = hashmap! { + table.clone() => req_items, + }; + self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send() + .map(|result| { + result + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + }) + .map(|_| ()) + }) + }); + try_join_all(futures) + } + } +}