From a004bd9e7555086d34dd760510c314afe409468f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 31 Aug 2023 16:19:22 -0400 Subject: [PATCH 01/15] refactor(kafka-sink): change sequential await to group await when commiting --- src/connector/src/sink/kafka.rs | 66 +++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 046e78cef9d54..339bfb6811f38 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -333,6 +333,8 @@ pub struct KafkaSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, + /// The buffer that stores + future_record_buffer: Vec<(Option, Option)>, } impl KafkaSinkWriter { @@ -376,6 +378,7 @@ impl KafkaSinkWriter { schema, pk_indices, is_append_only, + future_record_buffer: Vec::new(), }) } @@ -444,7 +447,7 @@ impl KafkaSinkWriter { Err(err) } - async fn write_json_objects( + async fn write_json_objects_inner( &self, event_key_object: Option, event_object: Option, @@ -463,10 +466,37 @@ impl KafkaSinkWriter { Ok(()) } - async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + async fn write_json_objects( + &mut self, + event_key_object: Option, + event_object: Option, + ) -> Result<()> { + self.future_record_buffer + .push((event_key_object, event_object)); + Ok(()) + } + + async fn commit(&mut self) -> Result<()> { + // Commit all together + for (key, value) in &self.future_record_buffer { + self.write_json_objects_inner(key.clone(), value.clone()) + .await?; + } + + // Clear the buffer + self.future_record_buffer.clear(); + + Ok(()) + } + + async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the dbz_stream let dbz_stream = gen_debezium_message_stream( - &self.schema, - &self.pk_indices, + &schema, + &pk_indices, chunk, ts_ms, DebeziumAdapterOpts::default(), @@ -481,13 +511,13 @@ impl KafkaSinkWriter { Ok(()) } - async fn upsert(&self, chunk: StreamChunk) -> Result<()> { - let upsert_stream = gen_upsert_message_stream( - &self.schema, - &self.pk_indices, - chunk, - UpsertAdapterOpts::default(), - ); + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the upsert_stream + let upsert_stream = + gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default()); #[for_await] for msg in upsert_stream { @@ -498,10 +528,14 @@ impl KafkaSinkWriter { Ok(()) } - async fn append_only(&self, chunk: StreamChunk) -> Result<()> { + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the append_only_stream let append_only_stream = gen_append_only_message_stream( - &self.schema, - &self.pk_indices, + &schema, + &pk_indices, chunk, AppendOnlyAdapterOpts::default(), ); @@ -551,6 +585,8 @@ impl SinkWriterV1 for KafkaSinkWriter { } async fn commit(&mut self) -> Result<()> { + // Group delivery (await the `FutureRecord`) here + self.commit().await?; Ok(()) } @@ -691,7 +727,7 @@ mod test { } /// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore] - /// to run the test + /// to run the test, also remember to modify `risedev.yml` #[ignore] #[tokio::test] async fn test_kafka_producer() -> Result<()> { From c01698f30f46904909212db146eb582705f760de Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 31 Aug 2023 17:54:49 -0400 Subject: [PATCH 02/15] fix check --- src/connector/src/sink/kafka.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 339bfb6811f38..647b0e7debf72 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -466,6 +466,7 @@ impl KafkaSinkWriter { Ok(()) } + #[expect(clippy::unused_async)] async fn write_json_objects( &mut self, event_key_object: Option, From 5121fb9ecd721fd75e8f4a3de23e91f2a3eb5247 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 1 Sep 2023 23:40:09 -0400 Subject: [PATCH 03/15] update new version --- src/connector/src/sink/kafka.rs | 148 ++++++++++++++++++++++---------- 1 file changed, 105 insertions(+), 43 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 647b0e7debf72..47da39a8a7d55 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -333,8 +333,8 @@ pub struct KafkaSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - /// The buffer that stores - future_record_buffer: Vec<(Option, Option)>, + future_delivery_buffer: Vec, + max_limit: Option, } impl KafkaSinkWriter { @@ -344,6 +344,7 @@ impl KafkaSinkWriter { pk_indices: Vec, is_append_only: bool, identifier: String, + // max_limit: Option, ) -> Result { let inner: FutureProducer = { let mut c = ClientConfig::new(); @@ -378,7 +379,9 @@ impl KafkaSinkWriter { schema, pk_indices, is_append_only, - future_record_buffer: Vec::new(), + future_delivery_buffer: Vec::new(), + // FIXME: Where to accept the input? + max_limit: Some(10), }) } @@ -398,7 +401,10 @@ impl KafkaSinkWriter { /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink /// messages - async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()> + async fn send_result<'a, K, P>( + &'a mut self, + mut record: FutureRecord<'a, K, P>, + ) -> KafkaResult<()> where K: ToBytes + ?Sized, P: ToBytes + ?Sized, @@ -406,28 +412,40 @@ impl KafkaSinkWriter { // The error to be returned let mut err = KafkaError::Canceled; + // First take the ownership of the exist buffer + let mut future_buffer = std::mem::take(&mut self.future_delivery_buffer); + + // Sanity check + debug_assert!( + self.future_delivery_buffer.is_empty(), + "future delivery buffer must be empty" + ); + + // The flag represents whether to commit + // This will happen when the size of buffering futures + // is greater than preset limit + let mut commit_flag = false; + + // To make borrow checker happy :) + let mut push_flag = false; + for _ in 0..self.config.max_retry_num { match self.send_result_inner(record).await { - Ok(delivery_future) => match delivery_future.await { - Ok(delivery_future_result) => match delivery_future_result { - // Successfully sent the record - // Will return the partition and offset of the message (i32, i64) - Ok(_) => return Ok(()), - // If the message failed to be delivered. (i.e., flush) - // The error & the copy of the original message will be returned - // i.e., (KafkaError, OwnedMessage) - // We will just stop the loop, and return the error - // The sink executor will back to the latest checkpoint - Err((k_err, _msg)) => { - err = k_err; + // Add the future to the buffer + Ok(delivery_future) => { + // Push the future into the buffer + future_buffer.push(delivery_future); + push_flag = true; + + // First see if the size is greater than the limit + if let Some(max_limit) = self.max_limit { + if future_buffer.len() > max_limit { + commit_flag = true; break; } - }, - // Nothing to do here, since the err has already been set to - // KafkaError::Canceled. This represents the producer is dropped - // before the delivery status is received - Err(_) => break, - }, + } + break; + } // The enqueue buffer is full, `send_result` will immediately return // We can retry for another round after sleeping for sometime Err((e, rec)) => { @@ -444,49 +462,93 @@ impl KafkaSinkWriter { } } + if commit_flag { + // Give the buffer back to the origin + std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); + + // Sanity check + debug_assert!( + future_buffer.is_empty(), + "future buffer must be empty after swapping" + ); + + match self.commit().await { + // FIXME: Is this error handling enough? + Ok(_) => return Ok(()), + Err(_) => return Err(err), + } + } + + if push_flag { + // Indicates success + std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); + + // Sanity check + debug_assert!( + future_buffer.is_empty(), + "future buffer must be empty after swapping" + ); + + return Ok(()); + } + Err(err) } - async fn write_json_objects_inner( - &self, + async fn write_json_objects( + &mut self, event_key_object: Option, event_object: Option, ) -> Result<()> { + let topic = self.config.common.topic.clone(); // here we assume the key part always exists and value part is optional. // if value is None, we will skip the payload part. let key_str = event_key_object.unwrap().to_string(); - let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str()) - .key(key_str.as_bytes()); + let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes()); let payload; if let Some(value) = event_object { payload = value.to_string(); record = record.payload(payload.as_bytes()); } + // Send the data but not wait it to finish sinking + // Will join all `DeliveryFuture` during commit self.send_result(record).await?; Ok(()) } - #[expect(clippy::unused_async)] - async fn write_json_objects( - &mut self, - event_key_object: Option, - event_object: Option, - ) -> Result<()> { - self.future_record_buffer - .push((event_key_object, event_object)); - Ok(()) - } - async fn commit(&mut self) -> Result<()> { + // Get the ownership first + // The buffer will automatically become an empty vector + let delivery_futures = std::mem::take(&mut self.future_delivery_buffer); + + // Sanity check + debug_assert!( + self.future_delivery_buffer.is_empty(), + "The buffer must be empty" + ); + // Commit all together - for (key, value) in &self.future_record_buffer { - self.write_json_objects_inner(key.clone(), value.clone()) - .await?; + // FIXME: At present we could not retry, do we actually need to? + for delivery_future in delivery_futures { + match delivery_future.await { + Ok(delivery_future_result) => match delivery_future_result { + // Successfully sent the record + // Will return the partition and offset of the message (i32, i64) + Ok(_) => continue, + // If the message failed to be delivered. (i.e., flush) + // The error & the copy of the original message will be returned + // i.e., (KafkaError, OwnedMessage) + // We will just stop the loop, and return the error + // The sink executor will back to the latest checkpoint + Err((k_err, _msg)) => return Err(SinkError::Kafka(k_err)), + }, + // This represents the producer is dropped + // before the delivery status is received + // Return `KafkaError::Canceled` + Err(_) => return Err(SinkError::Kafka(KafkaError::Canceled)), + } } - // Clear the buffer - self.future_record_buffer.clear(); - Ok(()) } From 65013df073afac2b2c8ab662f0f47e80483ba07c Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 4 Sep 2023 17:25:07 -0400 Subject: [PATCH 04/15] resolve some issues --- src/connector/src/sink/kafka.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 47da39a8a7d55..f7b3f55f6a24e 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -325,6 +325,11 @@ enum KafkaSinkState { Running(u64), } +/// The delivery buffer queue size +/// When the `DeliveryFuture` the current `future_delivery_buffer` +/// is buffering is greater than this size, then enforcing commit once +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 10; + pub struct KafkaSinkWriter { pub config: KafkaConfig, pub inner: FutureProducer, @@ -334,7 +339,6 @@ pub struct KafkaSinkWriter { pk_indices: Vec, is_append_only: bool, future_delivery_buffer: Vec, - max_limit: Option, } impl KafkaSinkWriter { @@ -344,7 +348,6 @@ impl KafkaSinkWriter { pk_indices: Vec, is_append_only: bool, identifier: String, - // max_limit: Option, ) -> Result { let inner: FutureProducer = { let mut c = ClientConfig::new(); @@ -380,8 +383,6 @@ impl KafkaSinkWriter { pk_indices, is_append_only, future_delivery_buffer: Vec::new(), - // FIXME: Where to accept the input? - max_limit: Some(10), }) } @@ -438,11 +439,9 @@ impl KafkaSinkWriter { push_flag = true; // First see if the size is greater than the limit - if let Some(max_limit) = self.max_limit { - if future_buffer.len() > max_limit { - commit_flag = true; - break; - } + if future_buffer.len() > KAFKA_WRITER_MAX_QUEUE_SIZE { + commit_flag = true; + break; } break; } From eb68ee4533bc88aa3da0bb49b30a61880b4fd581 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 5 Sep 2023 16:52:15 -0400 Subject: [PATCH 05/15] placeholder for commit_inner --- src/connector/src/sink/kafka.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 7905830334401..d9375067616b3 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; +use futures::FutureExt; +use futures::future::try_join_all; use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::message::ToBytes; @@ -396,8 +398,7 @@ impl KafkaSinkWriter { /// The wrapper function for the actual `FutureProducer::send_result` /// Just for better error handling purpose - #[expect(clippy::unused_async)] - async fn send_result_inner<'a, K, P>( + fn send_result_inner<'a, K, P>( &'a self, record: FutureRecord<'a, K, P>, ) -> core::result::Result)> @@ -439,7 +440,7 @@ impl KafkaSinkWriter { let mut push_flag = false; for _ in 0..self.config.max_retry_num { - match self.send_result_inner(record).await { + match self.send_result_inner(record) { // Add the future to the buffer Ok(delivery_future) => { // Push the future into the buffer @@ -559,6 +560,25 @@ impl KafkaSinkWriter { Ok(()) } + // async fn commit_inner(&mut self) -> Result<()> { + // let _v = try_join_all( + // self.future_delivery_buffer + // .drain(..) + // .map(|x| { + // x.map(|f| { + // match f { + // Ok(Ok(val)) => Ok(val), + // Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)), + // Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)), + // } + // }) + // }) + // ) + // .await?; + + // Ok(()) + // } + async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); From 250520a46ef8df6f8c03c12a329b9ec410b54e6a Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 6 Sep 2023 19:14:24 -0400 Subject: [PATCH 06/15] refactor & update new version --- src/connector/src/sink/kafka.rs | 172 +++++++++++++------------------- 1 file changed, 72 insertions(+), 100 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index d9375067616b3..ddf8ee55b3f74 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -342,7 +342,7 @@ pub struct KafkaSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - future_delivery_buffer: Vec, + future_delivery_buffer: VecDeque, db_name: String, sink_from_name: String, } @@ -390,7 +390,7 @@ impl KafkaSinkWriter { schema, pk_indices, is_append_only, - future_delivery_buffer: Vec::new(), + future_delivery_buffer: VecDeque::new(), db_name, sink_from_name, }) @@ -419,9 +419,6 @@ impl KafkaSinkWriter { K: ToBytes + ?Sized, P: ToBytes + ?Sized, { - // The error to be returned - let mut err = KafkaError::Canceled; - // First take the ownership of the exist buffer let mut future_buffer = std::mem::take(&mut self.future_delivery_buffer); @@ -431,76 +428,61 @@ impl KafkaSinkWriter { "future delivery buffer must be empty" ); - // The flag represents whether to commit - // This will happen when the size of buffering futures - // is greater than preset limit - let mut commit_flag = false; + let mut success_flag = false; - // To make borrow checker happy :) - let mut push_flag = false; + let mut ret = Ok(()); for _ in 0..self.config.max_retry_num { match self.send_result_inner(record) { - // Add the future to the buffer Ok(delivery_future) => { - // Push the future into the buffer - future_buffer.push(delivery_future); - push_flag = true; - - // First see if the size is greater than the limit - if future_buffer.len() > KAFKA_WRITER_MAX_QUEUE_SIZE { - commit_flag = true; - break; + // First check if the current length is + // greater than the preset limit + while future_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { + if let Some(delivery_future) = future_buffer.pop_front() { + Self::await_once(delivery_future).await?; + } else { + panic!("Expect the future not to be None"); + }; } + + future_buffer.push_back(delivery_future); + success_flag = true; break; } // The enqueue buffer is full, `send_result` will immediately return // We can retry for another round after sleeping for sometime Err((e, rec)) => { - err = e; record = rec; - match err { + match e { KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { tokio::time::sleep(self.config.retry_interval).await; continue; } - _ => break, + _ => { + ret = Err(e); + break; + } } } } } - if commit_flag { - // Give the buffer back to the origin - std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); - - // Sanity check - debug_assert!( - future_buffer.is_empty(), - "future buffer must be empty after swapping" - ); - - match self.commit().await { - // FIXME: Is this error handling enough? - Ok(_) => return Ok(()), - Err(_) => return Err(err), - } + if !success_flag { + // In this case, after trying `max_retry_num` + // The enqueue buffer is still full + ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)); } - if push_flag { - // Indicates success - std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); + // Reset the buffer + std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); - // Sanity check - debug_assert!( - future_buffer.is_empty(), - "future buffer must be empty after swapping" - ); - - return Ok(()); - } + // Sanity check + debug_assert!( + future_buffer.is_empty(), + "future delivery buffer must be empty" + ); - Err(err) + ret } async fn write_json_objects( @@ -524,64 +506,54 @@ impl KafkaSinkWriter { Ok(()) } - async fn commit(&mut self) -> Result<()> { - // Get the ownership first - // The buffer will automatically become an empty vector - let delivery_futures = std::mem::take(&mut self.future_delivery_buffer); + async fn await_once(delivery_future: DeliveryFuture) -> KafkaResult<()> { + match delivery_future.await { + Ok(Ok(_)) => Ok(()), + Ok(Err((k_err, _msg))) => Err(k_err), + Err(_) => Err(KafkaError::Canceled), + } + } + + async fn commit_inner(&mut self) -> Result<()> { + let _v = try_join_all( + self.future_delivery_buffer + .drain(..) + .map(|delivery_future| { + delivery_future.map(|delivery_future_result| { + match delivery_future_result { + // Successfully sent the record + // Will return the partition and offset of the message (i32, i64) + Ok(Ok(val)) => Ok(val), + // If the message failed to be delivered. (i.e., flush) + // The error & the copy of the original message will be returned + // i.e., (KafkaError, OwnedMessage) + // We will just stop the loop, and return the error + // The sink executor will back to the latest checkpoint + Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)), + // This represents the producer is dropped + // before the delivery status is received + // Return `KafkaError::Canceled` + Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)), + } + }) + }) + ) + .await?; // Sanity check debug_assert!( self.future_delivery_buffer.is_empty(), - "The buffer must be empty" + "The buffer after `commit_inner` must be empty" ); - // Commit all together - // FIXME: At present we could not retry, do we actually need to? - for delivery_future in delivery_futures { - match delivery_future.await { - Ok(delivery_future_result) => match delivery_future_result { - // Successfully sent the record - // Will return the partition and offset of the message (i32, i64) - Ok(_) => continue, - // If the message failed to be delivered. (i.e., flush) - // The error & the copy of the original message will be returned - // i.e., (KafkaError, OwnedMessage) - // We will just stop the loop, and return the error - // The sink executor will back to the latest checkpoint - Err((k_err, _msg)) => return Err(SinkError::Kafka(k_err)), - }, - // This represents the producer is dropped - // before the delivery status is received - // Return `KafkaError::Canceled` - Err(_) => return Err(SinkError::Kafka(KafkaError::Canceled)), - } - } - Ok(()) } - // async fn commit_inner(&mut self) -> Result<()> { - // let _v = try_join_all( - // self.future_delivery_buffer - // .drain(..) - // .map(|x| { - // x.map(|f| { - // match f { - // Ok(Ok(val)) => Ok(val), - // Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)), - // Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)), - // } - // }) - // }) - // ) - // .await?; - - // Ok(()) - // } - async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); + let db_name = self.db_name.clone(); + let sink_from_name = self.sink_from_name.clone(); // Initialize the dbz_stream let dbz_stream = gen_debezium_message_stream( @@ -590,8 +562,8 @@ impl KafkaSinkWriter { chunk, ts_ms, DebeziumAdapterOpts::default(), - &self.db_name, - &self.sink_from_name, + &db_name, + &sink_from_name, ); #[for_await] @@ -678,7 +650,7 @@ impl SinkWriterV1 for KafkaSinkWriter { async fn commit(&mut self) -> Result<()> { // Group delivery (await the `FutureRecord`) here - self.commit().await?; + self.commit_inner().await?; Ok(()) } From 76f95e4f714890fb16208ea68b5195af0e09cac6 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 6 Sep 2023 19:17:17 -0400 Subject: [PATCH 07/15] map return value of future from Ok(val) to Ok(()) to avoid unnecessary memory allocation --- src/connector/src/sink/kafka.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ddf8ee55b3f74..c48c8208704df 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -523,7 +523,8 @@ impl KafkaSinkWriter { match delivery_future_result { // Successfully sent the record // Will return the partition and offset of the message (i32, i64) - Ok(Ok(val)) => Ok(val), + // Note that `Vec<()>` won't cause memory allocation + Ok(Ok(_)) => Ok(()), // If the message failed to be delivered. (i.e., flush) // The error & the copy of the original message will be returned // i.e., (KafkaError, OwnedMessage) From b6a72c24747ed3153a86de3a05d3b8092cc3606b Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 6 Sep 2023 19:27:00 -0400 Subject: [PATCH 08/15] tiny fix --- src/connector/src/sink/kafka.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index c48c8208704df..16f07b953664a 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; -use futures::FutureExt; use futures::future::try_join_all; +use futures::FutureExt; use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::message::ToBytes; @@ -396,19 +396,6 @@ impl KafkaSinkWriter { }) } - /// The wrapper function for the actual `FutureProducer::send_result` - /// Just for better error handling purpose - fn send_result_inner<'a, K, P>( - &'a self, - record: FutureRecord<'a, K, P>, - ) -> core::result::Result)> - where - K: ToBytes + ?Sized, - P: ToBytes + ?Sized, - { - self.inner.send_result(record) - } - /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink /// messages async fn send_result<'a, K, P>( @@ -433,7 +420,7 @@ impl KafkaSinkWriter { let mut ret = Ok(()); for _ in 0..self.config.max_retry_num { - match self.send_result_inner(record) { + match self.inner.send_result(record) { Ok(delivery_future) => { // First check if the current length is // greater than the preset limit @@ -537,7 +524,7 @@ impl KafkaSinkWriter { Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)), } }) - }) + }), ) .await?; From 7339913aa9c47ea4e956e685945030df8babe835 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Sep 2023 16:52:14 -0400 Subject: [PATCH 09/15] refactor & update new version --- .../src/parser/protobuf/recursive.rs | 157 ++++++++++++++++++ src/connector/src/sink/kafka.rs | 72 ++++---- 2 files changed, 186 insertions(+), 43 deletions(-) create mode 100644 src/connector/src/parser/protobuf/recursive.rs diff --git a/src/connector/src/parser/protobuf/recursive.rs b/src/connector/src/parser/protobuf/recursive.rs new file mode 100644 index 0000000000000..dc367eb5f70cd --- /dev/null +++ b/src/connector/src/parser/protobuf/recursive.rs @@ -0,0 +1,157 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ComplexRecursiveMessage { + #[prost(string, tag = "1")] + pub node_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub node_id: i32, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub parent: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub children: ::prost::alloc::vec::Vec, +} +/// Nested message and enum types in `ComplexRecursiveMessage`. +pub mod complex_recursive_message { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Attributes { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub value: ::prost::alloc::string::String, + } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Parent { + #[prost(string, tag = "1")] + pub parent_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub parent_id: i32, + #[prost(message, repeated, tag = "3")] + pub siblings: ::prost::alloc::vec::Vec, + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AllTypes { + /// standard types + #[prost(double, tag = "1")] + pub double_field: f64, + #[prost(float, tag = "2")] + pub float_field: f32, + #[prost(int32, tag = "3")] + pub int32_field: i32, + #[prost(int64, tag = "4")] + pub int64_field: i64, + #[prost(uint32, tag = "5")] + pub uint32_field: u32, + #[prost(uint64, tag = "6")] + pub uint64_field: u64, + #[prost(sint32, tag = "7")] + pub sint32_field: i32, + #[prost(sint64, tag = "8")] + pub sint64_field: i64, + #[prost(fixed32, tag = "9")] + pub fixed32_field: u32, + #[prost(fixed64, tag = "10")] + pub fixed64_field: u64, + #[prost(sfixed32, tag = "11")] + pub sfixed32_field: i32, + #[prost(sfixed64, tag = "12")] + pub sfixed64_field: i64, + #[prost(bool, tag = "13")] + pub bool_field: bool, + #[prost(string, tag = "14")] + pub string_field: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "15")] + pub bytes_field: ::prost::alloc::vec::Vec, + #[prost(enumeration = "all_types::EnumType", tag = "16")] + pub enum_field: i32, + #[prost(message, optional, tag = "17")] + pub nested_message_field: ::core::option::Option, + /// repeated field + #[prost(int32, repeated, tag = "18")] + pub repeated_int_field: ::prost::alloc::vec::Vec, + /// timestamp + #[prost(message, optional, tag = "23")] + pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>, + /// duration + #[prost(message, optional, tag = "24")] + pub duration_field: ::core::option::Option<::prost_types::Duration>, + /// any + #[prost(message, optional, tag = "25")] + pub any_field: ::core::option::Option<::prost_types::Any>, + /// wrapper types + #[prost(message, optional, tag = "27")] + pub int32_value_field: ::core::option::Option, + #[prost(message, optional, tag = "28")] + pub string_value_field: ::core::option::Option<::prost::alloc::string::String>, + /// oneof field + #[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")] + pub example_oneof: ::core::option::Option, +} +/// Nested message and enum types in `AllTypes`. +pub mod all_types { + /// nested message + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NestedMessage { + #[prost(int32, tag = "1")] + pub id: i32, + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + } + /// enum + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum EnumType { + Default = 0, + Option1 = 1, + Option2 = 2, + } + impl EnumType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + EnumType::Default => "DEFAULT", + EnumType::Option1 => "OPTION1", + EnumType::Option2 => "OPTION2", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "DEFAULT" => Some(Self::Default), + "OPTION1" => Some(Self::Option1), + "OPTION2" => Some(Self::Option2), + _ => None, + } + } + } + /// oneof field + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum ExampleOneof { + #[prost(string, tag = "19")] + OneofString(::prost::alloc::string::String), + #[prost(int32, tag = "20")] + OneofInt32(i32), + #[prost(enumeration = "EnumType", tag = "21")] + OneofEnum(i32), + } +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 16f07b953664a..2b99725a107b9 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -18,11 +18,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; +use futures::channel::oneshot::Canceled; use futures::future::try_join_all; use futures::FutureExt; use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; -use rdkafka::message::ToBytes; +use rdkafka::message::{OwnedMessage, ToBytes}; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; @@ -51,6 +52,9 @@ use crate::{ deserialize_bool_from_string, deserialize_duration_from_string, deserialize_u32_from_string, }; +type FutureResult = + std::result::Result, Canceled>; + pub const KAFKA_SINK: &str = "kafka"; const fn _default_timeout() -> Duration { @@ -406,15 +410,6 @@ impl KafkaSinkWriter { K: ToBytes + ?Sized, P: ToBytes + ?Sized, { - // First take the ownership of the exist buffer - let mut future_buffer = std::mem::take(&mut self.future_delivery_buffer); - - // Sanity check - debug_assert!( - self.future_delivery_buffer.is_empty(), - "future delivery buffer must be empty" - ); - let mut success_flag = false; let mut ret = Ok(()); @@ -424,15 +419,16 @@ impl KafkaSinkWriter { Ok(delivery_future) => { // First check if the current length is // greater than the preset limit - while future_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { - if let Some(delivery_future) = future_buffer.pop_front() { - Self::await_once(delivery_future).await?; - } else { - panic!("Expect the future not to be None"); - }; + while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { + Self::map_future_result( + self.future_delivery_buffer + .pop_front() + .expect("Expect the future not to be None") + .await, + )?; } - future_buffer.push_back(delivery_future); + self.future_delivery_buffer.push_back(delivery_future); success_flag = true; break; } @@ -460,15 +456,6 @@ impl KafkaSinkWriter { ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)); } - // Reset the buffer - std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer); - - // Sanity check - debug_assert!( - future_buffer.is_empty(), - "future delivery buffer must be empty" - ); - ret } @@ -493,10 +480,21 @@ impl KafkaSinkWriter { Ok(()) } - async fn await_once(delivery_future: DeliveryFuture) -> KafkaResult<()> { - match delivery_future.await { + fn map_future_result(delivery_future_result: FutureResult) -> KafkaResult<()> { + match delivery_future_result { + // Successfully sent the record + // Will return the partition and offset of the message (i32, i64) + // Note that `Vec<()>` won't cause memory allocation Ok(Ok(_)) => Ok(()), + // If the message failed to be delivered. (i.e., flush) + // The error & the copy of the original message will be returned + // i.e., (KafkaError, OwnedMessage) + // We will just stop the loop, and return the error + // The sink executor will back to the latest checkpoint Ok(Err((k_err, _msg))) => Err(k_err), + // This represents the producer is dropped + // before the delivery status is received + // Return `KafkaError::Canceled` Err(_) => Err(KafkaError::Canceled), } } @@ -507,21 +505,9 @@ impl KafkaSinkWriter { .drain(..) .map(|delivery_future| { delivery_future.map(|delivery_future_result| { - match delivery_future_result { - // Successfully sent the record - // Will return the partition and offset of the message (i32, i64) - // Note that `Vec<()>` won't cause memory allocation - Ok(Ok(_)) => Ok(()), - // If the message failed to be delivered. (i.e., flush) - // The error & the copy of the original message will be returned - // i.e., (KafkaError, OwnedMessage) - // We will just stop the loop, and return the error - // The sink executor will back to the latest checkpoint - Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)), - // This represents the producer is dropped - // before the delivery status is received - // Return `KafkaError::Canceled` - Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)), + match Self::map_future_result(delivery_future_result) { + Ok(_) => Ok(()), + Err(err) => Err(SinkError::Kafka(err)), } }) }), From 188da6a915ff1dda1d8c1ff91681a973744d5870 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Sep 2023 16:58:10 -0400 Subject: [PATCH 10/15] =?UTF-8?q?delete=20strangely=20generated=20file?= =?UTF-8?q?=F0=9F=98=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/parser/protobuf/recursive.rs | 157 ------------------ 1 file changed, 157 deletions(-) delete mode 100644 src/connector/src/parser/protobuf/recursive.rs diff --git a/src/connector/src/parser/protobuf/recursive.rs b/src/connector/src/parser/protobuf/recursive.rs deleted file mode 100644 index dc367eb5f70cd..0000000000000 --- a/src/connector/src/parser/protobuf/recursive.rs +++ /dev/null @@ -1,157 +0,0 @@ -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ComplexRecursiveMessage { - #[prost(string, tag = "1")] - pub node_name: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub node_id: i32, - #[prost(message, repeated, tag = "3")] - pub attributes: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "4")] - pub parent: ::core::option::Option, - #[prost(message, repeated, tag = "5")] - pub children: ::prost::alloc::vec::Vec, -} -/// Nested message and enum types in `ComplexRecursiveMessage`. -pub mod complex_recursive_message { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Attributes { - #[prost(string, tag = "1")] - pub key: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub value: ::prost::alloc::string::String, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Parent { - #[prost(string, tag = "1")] - pub parent_name: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub parent_id: i32, - #[prost(message, repeated, tag = "3")] - pub siblings: ::prost::alloc::vec::Vec, - } -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct AllTypes { - /// standard types - #[prost(double, tag = "1")] - pub double_field: f64, - #[prost(float, tag = "2")] - pub float_field: f32, - #[prost(int32, tag = "3")] - pub int32_field: i32, - #[prost(int64, tag = "4")] - pub int64_field: i64, - #[prost(uint32, tag = "5")] - pub uint32_field: u32, - #[prost(uint64, tag = "6")] - pub uint64_field: u64, - #[prost(sint32, tag = "7")] - pub sint32_field: i32, - #[prost(sint64, tag = "8")] - pub sint64_field: i64, - #[prost(fixed32, tag = "9")] - pub fixed32_field: u32, - #[prost(fixed64, tag = "10")] - pub fixed64_field: u64, - #[prost(sfixed32, tag = "11")] - pub sfixed32_field: i32, - #[prost(sfixed64, tag = "12")] - pub sfixed64_field: i64, - #[prost(bool, tag = "13")] - pub bool_field: bool, - #[prost(string, tag = "14")] - pub string_field: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "15")] - pub bytes_field: ::prost::alloc::vec::Vec, - #[prost(enumeration = "all_types::EnumType", tag = "16")] - pub enum_field: i32, - #[prost(message, optional, tag = "17")] - pub nested_message_field: ::core::option::Option, - /// repeated field - #[prost(int32, repeated, tag = "18")] - pub repeated_int_field: ::prost::alloc::vec::Vec, - /// timestamp - #[prost(message, optional, tag = "23")] - pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>, - /// duration - #[prost(message, optional, tag = "24")] - pub duration_field: ::core::option::Option<::prost_types::Duration>, - /// any - #[prost(message, optional, tag = "25")] - pub any_field: ::core::option::Option<::prost_types::Any>, - /// wrapper types - #[prost(message, optional, tag = "27")] - pub int32_value_field: ::core::option::Option, - #[prost(message, optional, tag = "28")] - pub string_value_field: ::core::option::Option<::prost::alloc::string::String>, - /// oneof field - #[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")] - pub example_oneof: ::core::option::Option, -} -/// Nested message and enum types in `AllTypes`. -pub mod all_types { - /// nested message - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct NestedMessage { - #[prost(int32, tag = "1")] - pub id: i32, - #[prost(string, tag = "2")] - pub name: ::prost::alloc::string::String, - } - /// enum - #[derive( - Clone, - Copy, - Debug, - PartialEq, - Eq, - Hash, - PartialOrd, - Ord, - ::prost::Enumeration - )] - #[repr(i32)] - pub enum EnumType { - Default = 0, - Option1 = 1, - Option2 = 2, - } - impl EnumType { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - EnumType::Default => "DEFAULT", - EnumType::Option1 => "OPTION1", - EnumType::Option2 => "OPTION2", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "DEFAULT" => Some(Self::Default), - "OPTION1" => Some(Self::Option1), - "OPTION2" => Some(Self::Option2), - _ => None, - } - } - } - /// oneof field - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum ExampleOneof { - #[prost(string, tag = "19")] - OneofString(::prost::alloc::string::String), - #[prost(int32, tag = "20")] - OneofInt32(i32), - #[prost(enumeration = "EnumType", tag = "21")] - OneofEnum(i32), - } -} From 77aa7c2950528d4aed3597e0bc93a0f5d70ab634 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 12 Sep 2023 15:31:31 +0800 Subject: [PATCH 11/15] update madsim-rdkafka to fix the type error Signed-off-by: Runji Wang --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/workspace-hack/Cargo.toml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bda0a874c8c1d..6728535978d3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4307,7 +4307,7 @@ dependencies = [ [[package]] name = "madsim-rdkafka" version = "0.2.22" -source = "git+https://github.com/madsim-rs/madsim.git?rev=bb8f063#bb8f06384517ea3950b6c7a29a32c233058b89c7" +source = "git+https://github.com/madsim-rs/madsim.git?rev=fedb1e3#fedb1e3a0a8758650c9e15076941c999150bdb31" dependencies = [ "async-channel", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 83be53f1d7439..e33c09ddc5734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ aws-types = "0.55" etcd-client = { package = "madsim-etcd-client", version = "0.3" } futures-async-stream = "0.2" hytra = "0.1" -rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = [ +rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [ "cmake-build", ] } hashbrown = { version = "0.14.0", features = [ diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 3d9d4278a024f..b6bc1a198ab42 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -60,7 +60,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features = libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } mio = { version = "0.8", features = ["net", "os-ext"] } multimap = { version = "0.8" } @@ -157,7 +157,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features = libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } mio = { version = "0.8", features = ["net", "os-ext"] } multimap = { version = "0.8" } From 9de00bcee217227d14f0f842a79625bd3e18a314 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Sep 2023 16:40:05 -0400 Subject: [PATCH 12/15] tiny fix --- src/connector/src/sink/kafka.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 2b99725a107b9..e8cd22de52da1 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -441,10 +441,7 @@ impl KafkaSinkWriter { tokio::time::sleep(self.config.retry_interval).await; continue; } - _ => { - ret = Err(e); - break; - } + _ => return Err(e), } } } @@ -505,10 +502,7 @@ impl KafkaSinkWriter { .drain(..) .map(|delivery_future| { delivery_future.map(|delivery_future_result| { - match Self::map_future_result(delivery_future_result) { - Ok(_) => Ok(()), - Err(err) => Err(SinkError::Kafka(err)), - } + Self::map_future_result(delivery_future_result).map_err(SinkError::Kafka) }) }), ) @@ -524,6 +518,7 @@ impl KafkaSinkWriter { } async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); let db_name = self.db_name.clone(); @@ -550,6 +545,7 @@ impl KafkaSinkWriter { } async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); @@ -567,6 +563,7 @@ impl KafkaSinkWriter { } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); From 1cb469484c7e596ab04bcd848457906492c21e47 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Sep 2023 20:49:51 -0400 Subject: [PATCH 13/15] update function signature --- src/connector/src/sink/kafka.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index e8cd22de52da1..4e56976e0bf14 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -18,12 +18,11 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; -use futures::channel::oneshot::Canceled; use futures::future::try_join_all; -use futures::FutureExt; +use futures::{Future, FutureExt}; use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; -use rdkafka::message::{OwnedMessage, ToBytes}; +use rdkafka::message::ToBytes; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; @@ -52,9 +51,6 @@ use crate::{ deserialize_bool_from_string, deserialize_duration_from_string, deserialize_u32_from_string, }; -type FutureResult = - std::result::Result, Canceled>; - pub const KAFKA_SINK: &str = "kafka"; const fn _default_timeout() -> Duration { @@ -477,7 +473,9 @@ impl KafkaSinkWriter { Ok(()) } - fn map_future_result(delivery_future_result: FutureResult) -> KafkaResult<()> { + fn map_future_result( + delivery_future_result: ::Output, + ) -> KafkaResult<()> { match delivery_future_result { // Successfully sent the record // Will return the partition and offset of the message (i32, i64) From b08670f8da39e482ec5d0604bfa75df1b9c86103 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 13 Sep 2023 12:03:54 +0800 Subject: [PATCH 14/15] update queue size to 1024 --- src/connector/src/sink/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 4e56976e0bf14..2ba148ef9367c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -332,7 +332,7 @@ enum KafkaSinkState { /// The delivery buffer queue size /// When the `DeliveryFuture` the current `future_delivery_buffer` /// is buffering is greater than this size, then enforcing commit once -const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 10; +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 1024; pub struct KafkaSinkWriter { pub config: KafkaConfig, From 2e074770df45ecb23d8f56651da2d315307f0c6f Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 13 Sep 2023 12:15:11 +0800 Subject: [PATCH 15/15] update queue size to 65536 --- src/connector/src/sink/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 2ba148ef9367c..521f2faf54e0b 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -332,7 +332,7 @@ enum KafkaSinkState { /// The delivery buffer queue size /// When the `DeliveryFuture` the current `future_delivery_buffer` /// is buffering is greater than this size, then enforcing commit once -const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 1024; +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536; pub struct KafkaSinkWriter { pub config: KafkaConfig,