Skip to content

Commit

Permalink
fix: cherry-pick #12013 to branch v1.2-rc (#12257)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 13, 2023
1 parent f27f085 commit 0f94fdf
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ aws-types = "0.55"
etcd-client = { package = "madsim-etcd-client", version = "0.3" }
futures-async-stream = "0.2"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build"] }
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
Expand Down
177 changes: 117 additions & 60 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::{Future, FutureExt};
use futures_async_stream::for_await;
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::message::ToBytes;
Expand Down Expand Up @@ -325,6 +327,11 @@ enum KafkaSinkState {
Running(u64),
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536;

pub struct KafkaSinkWriter {
pub config: KafkaConfig,
pub inner: FutureProducer<PrivateLinkProducerContext>,
Expand All @@ -333,6 +340,7 @@ pub struct KafkaSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
future_delivery_buffer: VecDeque<DeliveryFuture>,
}

impl KafkaSinkWriter {
Expand Down Expand Up @@ -376,97 +384,138 @@ impl KafkaSinkWriter {
schema,
pk_indices,
is_append_only,
future_delivery_buffer: VecDeque::new(),
})
}

/// The wrapper function for the actual `FutureProducer::send_result`
/// Just for better error handling purpose
#[expect(clippy::unused_async)]
async fn send_result_inner<'a, K, P>(
&'a self,
record: FutureRecord<'a, K, P>,
) -> core::result::Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.inner.send_result(record)
}

/// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
/// messages
async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()>
async fn send_result<'a, K, P>(
&'a mut self,
mut record: FutureRecord<'a, K, P>,
) -> KafkaResult<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;
let mut success_flag = false;

let mut ret = Ok(());

for _ in 0..self.config.max_retry_num {
match self.send_result_inner(record).await {
Ok(delivery_future) => match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => return Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => {
err = k_err;
break;
}
},
// Nothing to do here, since the err has already been set to
// KafkaError::Canceled. This represents the producer is dropped
// before the delivery status is received
Err(_) => break,
},
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
.expect("Expect the future not to be None")
.await,
)?;
}

self.future_delivery_buffer.push_back(delivery_future);
success_flag = true;
break;
}
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
err = e;
record = rec;
match err {
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
_ => break,
_ => return Err(e),
}
}
}
}

Err(err)
if !success_flag {
// In this case, after trying `max_retry_num`
// The enqueue buffer is still full
ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull));
}

ret
}

async fn write_json_objects(
&self,
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str())
.key(key_str.as_bytes());
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
fn map_future_result(
delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {
match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
// Note that `Vec<()>` won't cause memory allocation
Ok(Ok(_)) => Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Ok(Err((k_err, _msg))) => Err(k_err),
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => Err(KafkaError::Canceled),
}
}

async fn commit_inner(&mut self) -> Result<()> {
let _v = try_join_all(
self.future_delivery_buffer
.drain(..)
.map(|delivery_future| {
delivery_future.map(|delivery_future_result| {
Self::map_future_result(delivery_future_result).map_err(SinkError::Kafka)
})
}),
)
.await?;

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"The buffer after `commit_inner` must be empty"
);

Ok(())
}

async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the dbz_stream
let dbz_stream = gen_debezium_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
ts_ms,
DebeziumAdapterOpts::default(),
Expand All @@ -481,13 +530,14 @@ impl KafkaSinkWriter {
Ok(())
}

async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let upsert_stream = gen_upsert_message_stream(
&self.schema,
&self.pk_indices,
chunk,
UpsertAdapterOpts::default(),
);
async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the upsert_stream
let upsert_stream =
gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default());

#[for_await]
for msg in upsert_stream {
Expand All @@ -498,10 +548,15 @@ impl KafkaSinkWriter {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
AppendOnlyAdapterOpts::default(),
);
Expand Down Expand Up @@ -551,6 +606,8 @@ impl SinkWriterV1 for KafkaSinkWriter {
}

async fn commit(&mut self) -> Result<()> {
// Group delivery (await the `FutureRecord`) here
self.commit_inner().await?;
Ok(())
}

Expand Down Expand Up @@ -691,7 +748,7 @@ mod test {
}

/// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
/// to run the test
/// to run the test, also remember to modify `risedev.yml`
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
memchr = { version = "2" }
miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] }
Expand Down Expand Up @@ -154,7 +154,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
memchr = { version = "2" }
miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] }
Expand Down

0 comments on commit 0f94fdf

Please sign in to comment.