Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cherry-pick #12013 to branch v1.2-rc #12257

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ aws-types = "0.55"
etcd-client = { package = "madsim-etcd-client", version = "0.3" }
futures-async-stream = "0.2"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build"] }
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
Expand Down
177 changes: 117 additions & 60 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::{Future, FutureExt};
use futures_async_stream::for_await;
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::message::ToBytes;
Expand Down Expand Up @@ -325,6 +327,11 @@ enum KafkaSinkState {
Running(u64),
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536;

pub struct KafkaSinkWriter {
pub config: KafkaConfig,
pub inner: FutureProducer<PrivateLinkProducerContext>,
Expand All @@ -333,6 +340,7 @@ pub struct KafkaSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
future_delivery_buffer: VecDeque<DeliveryFuture>,
}

impl KafkaSinkWriter {
Expand Down Expand Up @@ -376,97 +384,138 @@ impl KafkaSinkWriter {
schema,
pk_indices,
is_append_only,
future_delivery_buffer: VecDeque::new(),
})
}

/// The wrapper function for the actual `FutureProducer::send_result`
/// Just for better error handling purpose
#[expect(clippy::unused_async)]
async fn send_result_inner<'a, K, P>(
&'a self,
record: FutureRecord<'a, K, P>,
) -> core::result::Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.inner.send_result(record)
}

/// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
/// messages
async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()>
async fn send_result<'a, K, P>(
&'a mut self,
mut record: FutureRecord<'a, K, P>,
) -> KafkaResult<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;
let mut success_flag = false;

let mut ret = Ok(());

for _ in 0..self.config.max_retry_num {
match self.send_result_inner(record).await {
Ok(delivery_future) => match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => return Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => {
err = k_err;
break;
}
},
// Nothing to do here, since the err has already been set to
// KafkaError::Canceled. This represents the producer is dropped
// before the delivery status is received
Err(_) => break,
},
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
.expect("Expect the future not to be None")
.await,
)?;
}

self.future_delivery_buffer.push_back(delivery_future);
success_flag = true;
break;
}
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
err = e;
record = rec;
match err {
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
_ => break,
_ => return Err(e),
}
}
}
}

Err(err)
if !success_flag {
// In this case, after trying `max_retry_num`
// The enqueue buffer is still full
ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull));
}

ret
}

async fn write_json_objects(
&self,
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str())
.key(key_str.as_bytes());
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
fn map_future_result(
delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {
match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
// Note that `Vec<()>` won't cause memory allocation
Ok(Ok(_)) => Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Ok(Err((k_err, _msg))) => Err(k_err),
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => Err(KafkaError::Canceled),
}
}

async fn commit_inner(&mut self) -> Result<()> {
let _v = try_join_all(
self.future_delivery_buffer
.drain(..)
.map(|delivery_future| {
delivery_future.map(|delivery_future_result| {
Self::map_future_result(delivery_future_result).map_err(SinkError::Kafka)
})
}),
)
.await?;

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"The buffer after `commit_inner` must be empty"
);

Ok(())
}

async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the dbz_stream
let dbz_stream = gen_debezium_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
ts_ms,
DebeziumAdapterOpts::default(),
Expand All @@ -481,13 +530,14 @@ impl KafkaSinkWriter {
Ok(())
}

async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let upsert_stream = gen_upsert_message_stream(
&self.schema,
&self.pk_indices,
chunk,
UpsertAdapterOpts::default(),
);
async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the upsert_stream
let upsert_stream =
gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default());

#[for_await]
for msg in upsert_stream {
Expand All @@ -498,10 +548,15 @@ impl KafkaSinkWriter {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
AppendOnlyAdapterOpts::default(),
);
Expand Down Expand Up @@ -551,6 +606,8 @@ impl SinkWriterV1 for KafkaSinkWriter {
}

async fn commit(&mut self) -> Result<()> {
// Group delivery (await the `FutureRecord`) here
self.commit_inner().await?;
Ok(())
}

Expand Down Expand Up @@ -691,7 +748,7 @@ mod test {
}

/// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
/// to run the test
/// to run the test, also remember to modify `risedev.yml`
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
memchr = { version = "2" }
miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] }
Expand Down Expand Up @@ -154,7 +154,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
memchr = { version = "2" }
miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] }
Expand Down