Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kafka-sink): change sequential await to group await when committing #12013

Merged
merged 20 commits into from
Sep 13, 2023
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 140 additions & 62 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::FutureExt;
use futures_async_stream::for_await;
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::message::ToBytes;
Expand Down Expand Up @@ -327,6 +329,11 @@ enum KafkaSinkState {
Running(u64),
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 10;

pub struct KafkaSinkWriter {
pub config: KafkaConfig,
pub inner: FutureProducer<PrivateLinkProducerContext>,
Expand All @@ -335,6 +342,7 @@ pub struct KafkaSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
future_delivery_buffer: VecDeque<DeliveryFuture>,
db_name: String,
sink_from_name: String,
}
Expand Down Expand Up @@ -382,104 +390,168 @@ impl KafkaSinkWriter {
schema,
pk_indices,
is_append_only,
future_delivery_buffer: VecDeque::new(),
db_name,
sink_from_name,
})
}

/// The wrapper function for the actual `FutureProducer::send_result`
/// Just for better error handling purpose
#[expect(clippy::unused_async)]
async fn send_result_inner<'a, K, P>(
&'a self,
record: FutureRecord<'a, K, P>,
) -> core::result::Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.inner.send_result(record)
}

/// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
/// messages
async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()>
async fn send_result<'a, K, P>(
&'a mut self,
mut record: FutureRecord<'a, K, P>,
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
) -> KafkaResult<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;
// First take the ownership of the exist buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is too complicated.

It can be as simple as

if self.future_buffer.len() == self.max_limit {
    // wait for some future(not all) in future_buffer to finish to release the quota.
} else {
    let future = self.send_result.inner(record);
    self.future_buffer.push(future);
}

For the step that waits for some future to finish, you can use VecDeque instead of Vec to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate on FuturesUnordered or FuturesOrdered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is:

  1. Check if the buffer is full; if so, await all the current futures (which is self.commit)
  2. Otherwise, push the future into the buffer and return.
  3. If the enqueue buffer is full, retry til the preset limit.

I think this is reasonable, since we've already wrapped the delivered process into commit per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.
Ultimately if ever encounters error, we will change the status back to the latest checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.

Besides, it is discouraged to call commit inside send_result, since the commit method is one of the methods in the SinkWriter trait, and its semantic is to be called on a externally triggered checkpoint, while in send_result this is the not the case. If we really want to reuse some logic, we can have a separate method named like ensure_delivery, and call it in both send_result and commit.

Also in current code I am confused of the usage of commit_flag and push_flag. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit I mentioned here is not actually the SinkWriter::commit, instead a wrapper function of KafkaSinkWriter::commit that will clear the current future_delivery_buffer and ensure delivery. (And it's the function discussed below, the one to change to try_join_all. The semantic of which is essentially the same as the ensure_delivery wrapper function mentioned above)
For the use of commit_flag and push_flag, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into the future_record_buffer, then exchange the local future_buffer and the one in KafkaSinkWriter using std::mem::swap, in order to make borrow checker happy.
And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.

Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.

Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.

let mut future_buffer = std::mem::take(&mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"future delivery buffer must be empty"
);

let mut success_flag = false;

let mut ret = Ok(());

for _ in 0..self.config.max_retry_num {
match self.send_result_inner(record).await {
Ok(delivery_future) => match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => return Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => {
err = k_err;
break;
}
},
// Nothing to do here, since the err has already been set to
// KafkaError::Canceled. This represents the producer is dropped
// before the delivery status is received
Err(_) => break,
},
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while future_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code seems that we don't have to take self.future_delivery_buffer out and we can use self.delivery_future_buffer directly in where future_buffer is currently used. I am curious what error will the borrow checker report if we use it directly.

Copy link
Contributor Author

@xzhseh xzhseh Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this std::mem::take is used for the previous change 🤣, it's totally okay to directly use self.future_delivery_buffer with current implementation.

if let Some(delivery_future) = future_buffer.pop_front() {
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
Self::await_once(delivery_future).await?;
} else {
panic!("Expect the future not to be None");
};
}

future_buffer.push_back(delivery_future);
success_flag = true;
break;
}
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
err = e;
record = rec;
match err {
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
_ => break,
_ => {
ret = Err(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this error will always be overwritten by the later QueueFull. We may return here directly.

Copy link
Contributor Author

@xzhseh xzhseh Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer directly _ => return Err(e), here, so as to distinguish between QueueFull & other potential errors.
Then it will not be overwritten by the success_flag checking later.

break;
}
}
}
}
}

Err(err)
if !success_flag {
// In this case, after trying `max_retry_num`
// The enqueue buffer is still full
ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull));
}

// Reset the buffer
std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
future_buffer.is_empty(),
"future delivery buffer must be empty"
);

ret
}

async fn write_json_objects(
&self,
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str())
.key(key_str.as_bytes());
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
async fn await_once(delivery_future: DeliveryFuture) -> KafkaResult<()> {
match delivery_future.await {
Ok(Ok(_)) => Ok(()),
Ok(Err((k_err, _msg))) => Err(k_err),
Err(_) => Err(KafkaError::Canceled),
}
}

async fn commit_inner(&mut self) -> Result<()> {
let _v = try_join_all(
self.future_delivery_buffer
.drain(..)
.map(|delivery_future| {
delivery_future.map(|delivery_future_result| {
match delivery_future_result {
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
// Note that `Vec<()>` won't cause memory allocation
Ok(Ok(_)) => Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)),
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)),
}
})
}),
)
.await?;

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"The buffer after `commit_inner` must be empty"
);

Ok(())
}

async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();
let db_name = self.db_name.clone();
let sink_from_name = self.sink_from_name.clone();

// Initialize the dbz_stream
let dbz_stream = gen_debezium_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
ts_ms,
DebeziumAdapterOpts::default(),
&self.db_name,
&self.sink_from_name,
&db_name,
&sink_from_name,
);

#[for_await]
Expand All @@ -491,13 +563,13 @@ impl KafkaSinkWriter {
Ok(())
}

async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let upsert_stream = gen_upsert_message_stream(
&self.schema,
&self.pk_indices,
chunk,
UpsertAdapterOpts::default(),
);
async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
let schema = self.schema.clone();
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
let pk_indices = self.pk_indices.clone();

// Initialize the upsert_stream
let upsert_stream =
gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default());

#[for_await]
for msg in upsert_stream {
Expand All @@ -508,10 +580,14 @@ impl KafkaSinkWriter {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
AppendOnlyAdapterOpts::default(),
);
Expand Down Expand Up @@ -561,6 +637,8 @@ impl SinkWriterV1 for KafkaSinkWriter {
}

async fn commit(&mut self) -> Result<()> {
// Group delivery (await the `FutureRecord`) here
self.commit_inner().await?;
Ok(())
}

Expand Down Expand Up @@ -701,7 +779,7 @@ mod test {
}

/// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
/// to run the test
/// to run the test, also remember to modify `risedev.yml`
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
Expand Down