From a59f94428d642fa8b0dc37984467c4e985e93a39 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 13 Sep 2023 13:31:17 +0800 Subject: [PATCH] cherry-pick de1f776 in main --- Cargo.lock | 2 +- Cargo.toml | 4 +- src/connector/src/sink/kafka.rs | 177 +++++++++++++++++++++----------- src/workspace-hack/Cargo.toml | 4 +- 4 files changed, 123 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 945e630f07c99..c89f322e797cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4304,7 +4304,7 @@ dependencies = [ [[package]] name = "madsim-rdkafka" version = "0.2.22" -source = "git+https://github.com/madsim-rs/madsim.git?rev=bb8f063#bb8f06384517ea3950b6c7a29a32c233058b89c7" +source = "git+https://github.com/madsim-rs/madsim.git?rev=fedb1e3#fedb1e3a0a8758650c9e15076941c999150bdb31" dependencies = [ "async-channel", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index f3fc870ff4907..972573684b4dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,9 @@ aws-types = "0.55" etcd-client = { package = "madsim-etcd-client", version = "0.3" } futures-async-stream = "0.2" hytra = "0.1" -rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build"] } +rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [ + "cmake-build", +] } hashbrown = { version = "0.14.0", features = ["ahash", "inline-more", "nightly"] } criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.3.1" } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 046e78cef9d54..09c60931eecf2 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::anyhow; +use futures::future::try_join_all; +use futures::{Future, FutureExt}; use futures_async_stream::for_await; use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::message::ToBytes; @@ -325,6 +327,11 @@ enum KafkaSinkState { Running(u64), } +/// The delivery buffer queue size +/// When the `DeliveryFuture` the current `future_delivery_buffer` +/// is buffering is greater than this size, then enforcing commit once +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536; + pub struct KafkaSinkWriter { pub config: KafkaConfig, pub inner: FutureProducer, @@ -333,6 +340,7 @@ pub struct KafkaSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, + future_delivery_buffer: VecDeque, } impl KafkaSinkWriter { @@ -376,97 +384,138 @@ impl KafkaSinkWriter { schema, pk_indices, is_append_only, + future_delivery_buffer: VecDeque::new(), }) } - /// The wrapper function for the actual `FutureProducer::send_result` - /// Just for better error handling purpose - #[expect(clippy::unused_async)] - async fn send_result_inner<'a, K, P>( - &'a self, - record: FutureRecord<'a, K, P>, - ) -> core::result::Result)> - where - K: ToBytes + ?Sized, - P: ToBytes + ?Sized, - { - self.inner.send_result(record) - } - /// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink /// messages - async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()> + async fn send_result<'a, K, P>( + &'a mut self, + mut record: FutureRecord<'a, K, P>, + ) -> KafkaResult<()> where K: ToBytes + ?Sized, P: ToBytes + ?Sized, { - // The error to be returned - let mut err = KafkaError::Canceled; + let mut success_flag = false; + + let mut ret = Ok(()); for _ in 0..self.config.max_retry_num { - match self.send_result_inner(record).await { - Ok(delivery_future) => match delivery_future.await { - Ok(delivery_future_result) => match delivery_future_result { - // Successfully sent the record - // Will return the partition and offset of the message (i32, i64) - Ok(_) => return Ok(()), - // If the message failed to be delivered. (i.e., flush) - // The error & the copy of the original message will be returned - // i.e., (KafkaError, OwnedMessage) - // We will just stop the loop, and return the error - // The sink executor will back to the latest checkpoint - Err((k_err, _msg)) => { - err = k_err; - break; - } - }, - // Nothing to do here, since the err has already been set to - // KafkaError::Canceled. This represents the producer is dropped - // before the delivery status is received - Err(_) => break, - }, + match self.inner.send_result(record) { + Ok(delivery_future) => { + // First check if the current length is + // greater than the preset limit + while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { + Self::map_future_result( + self.future_delivery_buffer + .pop_front() + .expect("Expect the future not to be None") + .await, + )?; + } + + self.future_delivery_buffer.push_back(delivery_future); + success_flag = true; + break; + } // The enqueue buffer is full, `send_result` will immediately return // We can retry for another round after sleeping for sometime Err((e, rec)) => { - err = e; record = rec; - match err { + match e { KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { tokio::time::sleep(self.config.retry_interval).await; continue; } - _ => break, + _ => return Err(e), } } } } - Err(err) + if !success_flag { + // In this case, after trying `max_retry_num` + // The enqueue buffer is still full + ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)); + } + + ret } async fn write_json_objects( - &self, + &mut self, event_key_object: Option, event_object: Option, ) -> Result<()> { + let topic = self.config.common.topic.clone(); // here we assume the key part always exists and value part is optional. // if value is None, we will skip the payload part. let key_str = event_key_object.unwrap().to_string(); - let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str()) - .key(key_str.as_bytes()); + let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes()); let payload; if let Some(value) = event_object { payload = value.to_string(); record = record.payload(payload.as_bytes()); } + // Send the data but not wait it to finish sinking + // Will join all `DeliveryFuture` during commit self.send_result(record).await?; Ok(()) } - async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + fn map_future_result( + delivery_future_result: ::Output, + ) -> KafkaResult<()> { + match delivery_future_result { + // Successfully sent the record + // Will return the partition and offset of the message (i32, i64) + // Note that `Vec<()>` won't cause memory allocation + Ok(Ok(_)) => Ok(()), + // If the message failed to be delivered. (i.e., flush) + // The error & the copy of the original message will be returned + // i.e., (KafkaError, OwnedMessage) + // We will just stop the loop, and return the error + // The sink executor will back to the latest checkpoint + Ok(Err((k_err, _msg))) => Err(k_err), + // This represents the producer is dropped + // before the delivery status is received + // Return `KafkaError::Canceled` + Err(_) => Err(KafkaError::Canceled), + } + } + + async fn commit_inner(&mut self) -> Result<()> { + let _v = try_join_all( + self.future_delivery_buffer + .drain(..) + .map(|delivery_future| { + delivery_future.map(|delivery_future_result| { + Self::map_future_result(delivery_future_result).map_err(SinkError::Kafka) + }) + }), + ) + .await?; + + // Sanity check + debug_assert!( + self.future_delivery_buffer.is_empty(), + "The buffer after `commit_inner` must be empty" + ); + + Ok(()) + } + + async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the dbz_stream let dbz_stream = gen_debezium_message_stream( - &self.schema, - &self.pk_indices, + &schema, + &pk_indices, chunk, ts_ms, DebeziumAdapterOpts::default(), @@ -481,13 +530,14 @@ impl KafkaSinkWriter { Ok(()) } - async fn upsert(&self, chunk: StreamChunk) -> Result<()> { - let upsert_stream = gen_upsert_message_stream( - &self.schema, - &self.pk_indices, - chunk, - UpsertAdapterOpts::default(), - ); + async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the upsert_stream + let upsert_stream = + gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default()); #[for_await] for msg in upsert_stream { @@ -498,10 +548,15 @@ impl KafkaSinkWriter { Ok(()) } - async fn append_only(&self, chunk: StreamChunk) -> Result<()> { + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + // TODO: Remove the clones here, only to satisfy borrow checker at present + let schema = self.schema.clone(); + let pk_indices = self.pk_indices.clone(); + + // Initialize the append_only_stream let append_only_stream = gen_append_only_message_stream( - &self.schema, - &self.pk_indices, + &schema, + &pk_indices, chunk, AppendOnlyAdapterOpts::default(), ); @@ -551,6 +606,8 @@ impl SinkWriterV1 for KafkaSinkWriter { } async fn commit(&mut self) -> Result<()> { + // Group delivery (await the `FutureRecord`) here + self.commit_inner().await?; Ok(()) } @@ -691,7 +748,7 @@ mod test { } /// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore] - /// to run the test + /// to run the test, also remember to modify `risedev.yml` #[ignore] #[tokio::test] async fn test_kafka_producer() -> Result<()> { diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index a63eaa9abf35c..a7660c8c59603 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -58,7 +58,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features = libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } memchr = { version = "2" } miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } @@ -154,7 +154,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features = libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } -madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } memchr = { version = "2" } miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] }