diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index c8c3c598e6319..825c2fc3d7a14 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -18,11 +18,9 @@ use anyhow::{anyhow, Context}; use aws_sdk_dynamodb as dynamodb; use aws_sdk_dynamodb::client::Client; use aws_smithy_types::Blob; -use dynamodb::types::{ - AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics, - TableStatus, WriteRequest, -}; -use maplit::hashmap; +use dynamodb::types::{AttributeValue, TableStatus, WriteRequest}; +use futures::prelude::future::TryFutureExt; +use futures::prelude::TryFuture; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row as _; @@ -31,6 +29,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; +use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture}; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ @@ -50,10 +49,33 @@ pub struct DynamoDbConfig { #[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")] #[serde_as(as = "DisplayFromStr")] + #[deprecated] pub max_batch_rows: usize, #[serde(flatten)] pub aws_auth_props: AwsAuthProps, + + #[serde( + rename = "dynamodb.max_batch_item_nums", + default = "default_max_batch_item_nums" + )] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_item_nums: usize, + + #[serde( + rename = "dynamodb.max_future_send_nums", + default = "default_max_future_send_nums" + )] + #[serde_as(as = "DisplayFromStr")] + pub max_future_send_nums: usize, +} + +fn default_max_batch_item_nums() -> usize { + 25 +} + +fn default_max_future_send_nums() -> usize { + 256 } fn default_max_batch_rows() -> usize { @@ -141,7 +163,7 @@ impl Sink for DynamoDbSink { Ok( DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone()) .await? - .into_log_sinker(usize::MAX), + .into_log_sinker(self.config.max_future_send_nums), ) } } @@ -183,77 +205,7 @@ impl DynamoDbRequest { } } -struct DynamoDbPayloadWriter { - request_items: Vec, - client: Client, - table: String, - dynamodb_keys: Vec, -} - -impl DynamoDbPayloadWriter { - fn write_one_insert(&mut self, item: HashMap) { - let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); - let req = WriteRequest::builder().put_request(put_req).build(); - self.write_one_req(req); - } - - fn write_one_delete(&mut self, key: HashMap) { - let key = key - .into_iter() - .filter(|(k, _)| self.dynamodb_keys.contains(k)) - .collect(); - let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); - let req = WriteRequest::builder().delete_request(del_req).build(); - self.write_one_req(req); - } - - fn write_one_req(&mut self, req: WriteRequest) { - let r_req = DynamoDbRequest { - inner: req, - key_items: self.dynamodb_keys.clone(), - }; - if let Some(v) = r_req.extract_pk_values() { - self.request_items.retain(|item| { - !item - .extract_pk_values() - .unwrap_or_default() - .iter() - .all(|x| v.contains(x)) - }); - } - self.request_items.push(r_req); - } - - async fn write_chunk(&mut self) -> Result<()> { - if !self.request_items.is_empty() { - let table = self.table.clone(); - let req_items = std::mem::take(&mut self.request_items) - .into_iter() - .map(|r| r.inner) - .collect(); - let reqs = hashmap! { - table => req_items, - }; - self.client - .batch_write_item() - .set_request_items(Some(reqs)) - .return_consumed_capacity(ReturnConsumedCapacity::None) - .return_item_collection_metrics(ReturnItemCollectionMetrics::None) - .send() - .await - .map_err(|e| { - SinkError::DynamoDb( - anyhow!(e).context("failed to delete item from DynamoDB sink"), - ) - })?; - } - - Ok(()) - } -} - pub struct DynamoDbSinkWriter { - max_batch_rows: usize, payload_writer: DynamoDbPayloadWriter, formatter: DynamoDbFormatter, } @@ -282,56 +234,52 @@ impl DynamoDbSinkWriter { .collect(); let payload_writer = DynamoDbPayloadWriter { - request_items: Vec::new(), client, - table: config.table, + table: config.table.clone(), dynamodb_keys, + max_batch_item_nums: config.max_batch_item_nums, }; Ok(Self { - max_batch_rows: config.max_batch_rows, payload_writer, formatter: DynamoDbFormatter { schema }, }) } - async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> { + fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result { + let mut request_items = Vec::new(); for (op, row) in chunk.rows() { let items = self.formatter.format_row(row)?; match op { Op::Insert | Op::UpdateInsert => { - self.payload_writer.write_one_insert(items); + self.payload_writer + .write_one_insert(items, &mut request_items); } Op::Delete => { - self.payload_writer.write_one_delete(items); + self.payload_writer + .write_one_delete(items, &mut request_items); } Op::UpdateDelete => {} } } - if self.payload_writer.request_items.len() >= self.max_batch_rows { - self.payload_writer.write_chunk().await?; - } - Ok(()) - } - - async fn flush(&mut self) -> Result<()> { - self.payload_writer.write_chunk().await + Ok(self.payload_writer.write_chunk(request_items)) } } +pub type DynamoDbSinkDeliveryFuture = impl TryFuture + Unpin + 'static; + impl AsyncTruncateSinkWriter for DynamoDbSinkWriter { + type DeliveryFuture = DynamoDbSinkDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - self.write_chunk_inner(chunk).await - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.flush().await?; - } + let futures = self.write_chunk_inner(chunk)?; + add_future + .add_future_may_await(futures.map_ok(|_: Vec<()>| ())) + .await?; Ok(()) } } @@ -400,3 +348,123 @@ fn map_data(scalar_ref: Option>, data_type: &DataType) -> Resu }; Ok(attr) } + +mod write_chunk_future { + use core::result; + use std::collections::HashMap; + + use anyhow::anyhow; + use aws_sdk_dynamodb as dynamodb; + use aws_sdk_dynamodb::client::Client; + use aws_smithy_runtime_api::client::orchestrator::HttpResponse; + use dynamodb::error::SdkError; + use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput}; + use dynamodb::types::{ + AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, + ReturnItemCollectionMetrics, WriteRequest, + }; + use futures::future::{Map, TryJoinAll}; + use futures::prelude::future::{try_join_all, FutureExt}; + use futures::prelude::Future; + use itertools::Itertools; + use maplit::hashmap; + + use super::{DynamoDbRequest, Result, SinkError}; + + pub type WriteChunkFuture = TryJoinAll< + Map< + impl Future< + Output = result::Result< + BatchWriteItemOutput, + SdkError, + >, + >, + impl FnOnce( + result::Result>, + ) -> Result<()>, + >, + >; + pub struct DynamoDbPayloadWriter { + pub client: Client, + pub table: String, + pub dynamodb_keys: Vec, + pub max_batch_item_nums: usize, + } + + impl DynamoDbPayloadWriter { + pub fn write_one_insert( + &mut self, + item: HashMap, + request_items: &mut Vec, + ) { + let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap(); + let req = WriteRequest::builder().put_request(put_req).build(); + self.write_one_req(req, request_items); + } + + pub fn write_one_delete( + &mut self, + key: HashMap, + request_items: &mut Vec, + ) { + let key = key + .into_iter() + .filter(|(k, _)| self.dynamodb_keys.contains(k)) + .collect(); + let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap(); + let req = WriteRequest::builder().delete_request(del_req).build(); + self.write_one_req(req, request_items); + } + + pub fn write_one_req( + &mut self, + req: WriteRequest, + request_items: &mut Vec, + ) { + let r_req = DynamoDbRequest { + inner: req, + key_items: self.dynamodb_keys.clone(), + }; + if let Some(v) = r_req.extract_pk_values() { + request_items.retain(|item| { + !item + .extract_pk_values() + .unwrap_or_default() + .iter() + .all(|x| v.contains(x)) + }); + } + request_items.push(r_req); + } + + pub fn write_chunk(&mut self, request_items: Vec) -> WriteChunkFuture { + let table = self.table.clone(); + let chunks = request_items + .into_iter() + .map(|r| r.inner) + .chunks(self.max_batch_item_nums); + let futures = chunks.into_iter().map(|chunk| { + let req_items = chunk.collect(); + let reqs = hashmap! { + table.clone() => req_items, + }; + self.client + .batch_write_item() + .set_request_items(Some(reqs)) + .return_consumed_capacity(ReturnConsumedCapacity::None) + .return_item_collection_metrics(ReturnItemCollectionMetrics::None) + .send() + .map(|result| { + result + .map_err(|e| { + SinkError::DynamoDb( + anyhow!(e).context("failed to delete item from DynamoDB sink"), + ) + }) + .map(|_| ()) + }) + }); + try_join_all(futures) + } + } +} diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index d11572856d785..244a1bb70db8d 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -17,37 +17,96 @@ use std::ops::Deref; use std::sync::LazyLock; use anyhow::anyhow; +use futures::future::{try_join_all, TryJoinAll}; +use futures::prelude::TryFuture; +use futures::TryFutureExt; use itertools::Itertools; use mongodb::bson::{bson, doc, Array, Bson, Document}; use mongodb::{Client, Namespace}; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; -use risingwave_common::must_match; use risingwave_common::row::Row; use risingwave_common::types::ScalarRefImpl; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; -use tonic::async_trait; use with_options::WithOptions; use super::encoder::BsonEncoder; +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::connector_common::MongodbCommon; use crate::deserialize_bool_from_string; use crate::sink::encoder::RowEncoder; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterMetrics, - SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; +mod send_bulk_write_command_future { + use core::future::Future; + + use anyhow::anyhow; + use mongodb::bson::Document; + use mongodb::Database; + + use crate::sink::{Result, SinkError}; + + pub(super) type SendBulkWriteCommandFuture = impl Future> + 'static; + + pub(super) fn send_bulk_write_commands( + db: Database, + upsert: Option, + delete: Option, + ) -> SendBulkWriteCommandFuture { + async move { + if let Some(upsert) = upsert { + send_bulk_write_command(db.clone(), upsert).await?; + } + if let Some(delete) = delete { + send_bulk_write_command(db, delete).await?; + } + Ok(()) + } + } + + async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { + let result = db.run_command(command, None).await.map_err(|err| { + SinkError::Mongodb(anyhow!(err).context(format!( + "sending bulk write command failed, database: {}", + db.name() + ))) + })?; + + if let Ok(write_errors) = result.get_array("writeErrors") { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with write errors: {:?}", + write_errors, + ))); + } + + let n = result.get_i32("n").map_err(|err| { + SinkError::Mongodb( + anyhow!(err).context("can't extract field n from bulk write response"), + ) + })?; + if n < 1 { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with an abnormal state, n = {}", + n + ))); + } + + Ok(()) + } +} + pub const MONGODB_SINK: &str = "mongodb"; +const MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 4096; -// 65536 seems like a reasonable limit, but we may consider setting this limit to 100,000, -// which is the actual limit imposed by the server. -// see https://www.mongodb.com/docs/v4.2/reference/command/hello/#hello.maxWriteBatchSize for more details -pub const MONGODB_BULK_WRITE_SIZE_LIMIT: usize = 65536; pub const MONGODB_PK_NAME: &str = "_id"; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -55,7 +114,6 @@ static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::de const fn _default_bulk_write_max_entries() -> usize { 1024 } - #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct MongodbConfig { @@ -86,6 +144,7 @@ pub struct MongodbConfig { default = "_default_bulk_write_max_entries" )] #[serde_as(as = "DisplayFromStr")] + #[deprecated] pub bulk_write_max_entries: usize, } @@ -179,7 +238,7 @@ impl TryFrom for MongodbSink { impl Sink for MongodbSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = MONGODB_SINK; @@ -224,14 +283,6 @@ impl Sink for MongodbSink { } } - if self.config.bulk_write_max_entries > MONGODB_BULK_WRITE_SIZE_LIMIT { - return Err(SinkError::Config(anyhow!( - "mongodb.bulk_write.max_entries {} exceeds the limit {}", - self.config.bulk_write_max_entries, - MONGODB_BULK_WRITE_SIZE_LIMIT - ))); - } - if let Err(err) = self.config.common.collection_name.parse::() { return Err(SinkError::Config(anyhow!(err).context(format!( "invalid collection.name {}", @@ -302,16 +353,16 @@ impl Sink for MongodbSink { self.is_append_only, ) .await? - .into_log_sinker(SinkWriterMetrics::new(&writer_param))) + .into_log_sinker(MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE)) } } +use send_bulk_write_command_future::*; + pub struct MongodbSinkWriter { pub config: MongodbConfig, payload_writer: MongodbPayloadWriter, is_append_only: bool, - // TODO switching to bulk write API when mongodb driver supports it - command_builder: CommandBuilder, } impl MongodbSinkWriter { @@ -356,19 +407,12 @@ impl MongodbSinkWriter { let row_encoder = BsonEncoder::new(schema.clone(), Some(col_indices), pk_indices.clone()); - let command_builder = if is_append_only { - CommandBuilder::AppendOnly(HashMap::new()) - } else { - CommandBuilder::Upsert(HashMap::new()) - }; - let payload_writer = MongodbPayloadWriter::new( schema, pk_indices, default_namespace, coll_name_field_index, ClientGuard::new(name, client), - config.bulk_write_max_entries, row_encoder, ); @@ -376,13 +420,11 @@ impl MongodbSinkWriter { config, payload_writer, is_append_only, - command_builder, }) } - async fn append(&mut self, chunk: StreamChunk) -> Result<()> { - let insert_builder = - must_match!(&mut self.command_builder, CommandBuilder::AppendOnly(builder) => builder); + fn append(&mut self, chunk: StreamChunk) -> Result> { + let mut insert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op != Op::Insert { if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { @@ -395,49 +437,42 @@ impl MongodbSinkWriter { } continue; } - self.payload_writer.append(insert_builder, row).await?; + self.payload_writer.append(&mut insert_builder, row)?; } - Ok(()) + Ok(self.payload_writer.flush_insert(insert_builder)) } - async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { - let upsert_builder = - must_match!(&mut self.command_builder, CommandBuilder::Upsert(builder) => builder); + fn upsert(&mut self, chunk: StreamChunk) -> Result> { + let mut upsert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op == Op::UpdateDelete { // we should ignore the `UpdateDelete` in upsert mode continue; } - self.payload_writer.upsert(upsert_builder, op, row).await?; + self.payload_writer.upsert(&mut upsert_builder, op, row)?; } - Ok(()) + Ok(self.payload_writer.flush_upsert(upsert_builder)) } } -#[async_trait] -impl SinkWriter for MongodbSinkWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } +pub type MongodbSinkDeliveryFuture = impl TryFuture + Unpin + 'static; - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.is_append_only { - self.append(chunk).await - } else { - self.upsert(chunk).await - } - } +impl AsyncTruncateSinkWriter for MongodbSinkWriter { + type DeliveryFuture = MongodbSinkDeliveryFuture; - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - if self.is_append_only { - let insert_builder = must_match!(&mut self.command_builder, CommandBuilder::AppendOnly(builder) => builder); - self.payload_writer.flush_insert(insert_builder).await?; - } else { - let upsert_builder = must_match!(&mut self.command_builder, CommandBuilder::Upsert(builder) => builder); - self.payload_writer.flush_upsert(upsert_builder).await?; - } - } + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + let futures = if self.is_append_only { + self.append(chunk)? + } else { + self.upsert(chunk)? + }; + add_future + .add_future_may_await(futures.map_ok(|_: Vec<()>| ())) + .await?; Ok(()) } } @@ -448,10 +483,10 @@ struct InsertCommandBuilder { } impl InsertCommandBuilder { - fn new(coll: String, capacity: usize) -> Self { + fn new(coll: String) -> Self { Self { coll, - inserts: Array::with_capacity(capacity), + inserts: Array::new(), } } @@ -475,11 +510,11 @@ struct UpsertCommandBuilder { } impl UpsertCommandBuilder { - fn new(coll: String, capacity: usize) -> Self { + fn new(coll: String) -> Self { Self { coll, - updates: Array::with_capacity(capacity), - deletes: HashMap::with_capacity(capacity), + updates: Array::new(), + deletes: HashMap::new(), } } @@ -541,11 +576,6 @@ impl UpsertCommandBuilder { } } -enum CommandBuilder { - AppendOnly(HashMap), - Upsert(HashMap), -} - type MongodbNamespace = (String, String); // In the future, we may build the payload into RawBSON to gain a better performance. @@ -556,8 +586,6 @@ struct MongodbPayloadWriter { default_namespace: Namespace, coll_name_field_index: Option, client: ClientGuard, - buffered_entries: usize, - max_entries: usize, row_encoder: BsonEncoder, } @@ -568,7 +596,6 @@ impl MongodbPayloadWriter { default_namespace: Namespace, coll_name_field_index: Option, client: ClientGuard, - max_entries: usize, row_encoder: BsonEncoder, ) -> Self { Self { @@ -577,8 +604,6 @@ impl MongodbPayloadWriter { default_namespace, coll_name_field_index, client, - buffered_entries: 0, - max_entries, row_encoder, } } @@ -620,7 +645,7 @@ impl MongodbPayloadWriter { } } - async fn append( + fn append( &mut self, insert_builder: &mut HashMap, row: RowRef<'_>, @@ -631,17 +656,12 @@ impl MongodbPayloadWriter { insert_builder .entry(ns) - .or_insert_with(|| InsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| InsertCommandBuilder::new(coll)) .append(document); - - self.buffered_entries += 1; - if self.buffered_entries >= self.max_entries { - self.flush_insert(insert_builder).await?; - } Ok(()) } - async fn upsert( + fn upsert( &mut self, upsert_builder: &mut HashMap, op: Op, @@ -665,88 +685,46 @@ impl MongodbPayloadWriter { match op { Op::Insert | Op::UpdateInsert => upsert_builder .entry(ns) - .or_insert_with(|| UpsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| UpsertCommandBuilder::new(coll)) .add_upsert(pk, document)?, Op::UpdateDelete => (), Op::Delete => upsert_builder .entry(ns) - .or_insert_with(|| UpsertCommandBuilder::new(coll, self.max_entries)) + .or_insert_with(|| UpsertCommandBuilder::new(coll)) .add_delete(pk)?, } - - self.buffered_entries += 1; - if self.buffered_entries >= self.max_entries { - self.flush_upsert(upsert_builder).await?; - } Ok(()) } - async fn flush_insert( - &mut self, - insert_builder: &mut HashMap, - ) -> Result<()> { + fn flush_insert( + &self, + insert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - for (ns, builder) in insert_builder.drain() { - self.send_bulk_write_command(&ns.0, builder.build()).await?; - } - self.buffered_entries = 0; - Ok(()) + let futures = insert_builder.into_iter().map(|(ns, builder)| { + let db = self.client.database(&ns.0); + send_bulk_write_commands(db, Some(builder.build()), None) + }); + try_join_all(futures) } - async fn flush_upsert( - &mut self, - upsert_builder: &mut HashMap, - ) -> Result<()> { + fn flush_upsert( + &self, + upsert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - for (ns, builder) in upsert_builder.drain() { + let futures = upsert_builder.into_iter().map(|(ns, builder)| { let (upsert, delete) = builder.build(); // we are sending the bulk upsert first because, under same pk, the `Insert` and `UpdateInsert` // should always appear before `Delete`. we have already ignored the `UpdateDelete` // which is useless in upsert mode. - if let Some(upsert) = upsert { - self.send_bulk_write_command(&ns.0, upsert).await?; - } - if let Some(delete) = delete { - self.send_bulk_write_command(&ns.0, delete).await?; - } - } - self.buffered_entries = 0; - Ok(()) - } - - async fn send_bulk_write_command(&self, database: &str, command: Document) -> Result<()> { - let db = self.client.database(database); - - let result = db.run_command(command, None).await.map_err(|err| { - SinkError::Mongodb(anyhow!(err).context(format!( - "sending bulk write command failed, database: {}", - database - ))) - })?; - - if let Ok(write_errors) = result.get_array("writeErrors") { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with write errors: {:?}", - write_errors, - ))); - } - - let n = result.get_i32("n").map_err(|err| { - SinkError::Mongodb( - anyhow!(err).context("can't extract field n from bulk write response"), - ) - })?; - if n < 1 { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with an abnormal state, n = {}", - n - ))); - } - - Ok(()) + let db = self.client.database(&ns.0); + send_bulk_write_commands(db, upsert, delete) + }); + try_join_all(futures) } } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 347fa10f5ac92..ba2fa8746a1da 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -224,6 +224,14 @@ DynamoDbConfig: required: false alias: - profile + - name: dynamodb.max_batch_item_nums + field_type: usize + required: false + default: '25' + - name: dynamodb.max_future_send_nums + field_type: usize + required: false + default: '256' FsConfig: fields: - name: fs.path