Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kafka-sink): change sequential await to group await when committing #12013

Merged
merged 20 commits into from
Sep 13, 2023
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 135 additions & 36 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ pub struct KafkaSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
future_delivery_buffer: Vec<DeliveryFuture>,
max_limit: Option<usize>,
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
}

impl KafkaSinkWriter {
Expand All @@ -342,6 +344,7 @@ impl KafkaSinkWriter {
pk_indices: Vec<usize>,
is_append_only: bool,
identifier: String,
// max_limit: Option<usize>,
) -> Result<Self> {
let inner: FutureProducer<PrivateLinkProducerContext> = {
let mut c = ClientConfig::new();
Expand Down Expand Up @@ -376,6 +379,9 @@ impl KafkaSinkWriter {
schema,
pk_indices,
is_append_only,
future_delivery_buffer: Vec::new(),
// FIXME: Where to accept the input?
max_limit: Some(10),
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand All @@ -395,36 +401,51 @@ impl KafkaSinkWriter {

/// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
/// messages
async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()>
async fn send_result<'a, K, P>(
&'a mut self,
mut record: FutureRecord<'a, K, P>,
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
) -> KafkaResult<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;

// First take the ownership of the exist buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is too complicated.

It can be as simple as

if self.future_buffer.len() == self.max_limit {
    // wait for some future(not all) in future_buffer to finish to release the quota.
} else {
    let future = self.send_result.inner(record);
    self.future_buffer.push(future);
}

For the step that waits for some future to finish, you can use VecDeque instead of Vec to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate on FuturesUnordered or FuturesOrdered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is:

  1. Check if the buffer is full; if so, await all the current futures (which is self.commit)
  2. Otherwise, push the future into the buffer and return.
  3. If the enqueue buffer is full, retry til the preset limit.

I think this is reasonable, since we've already wrapped the delivered process into commit per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.
Ultimately if ever encounters error, we will change the status back to the latest checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.

Besides, it is discouraged to call commit inside send_result, since the commit method is one of the methods in the SinkWriter trait, and its semantic is to be called on a externally triggered checkpoint, while in send_result this is the not the case. If we really want to reuse some logic, we can have a separate method named like ensure_delivery, and call it in both send_result and commit.

Also in current code I am confused of the usage of commit_flag and push_flag. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit I mentioned here is not actually the SinkWriter::commit, instead a wrapper function of KafkaSinkWriter::commit that will clear the current future_delivery_buffer and ensure delivery. (And it's the function discussed below, the one to change to try_join_all. The semantic of which is essentially the same as the ensure_delivery wrapper function mentioned above)
For the use of commit_flag and push_flag, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into the future_record_buffer, then exchange the local future_buffer and the one in KafkaSinkWriter using std::mem::swap, in order to make borrow checker happy.
And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.

Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.

Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.

let mut future_buffer = std::mem::take(&mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"future delivery buffer must be empty"
);

// The flag represents whether to commit
// This will happen when the size of buffering futures
// is greater than preset limit
let mut commit_flag = false;

// To make borrow checker happy :)
let mut push_flag = false;

for _ in 0..self.config.max_retry_num {
match self.send_result_inner(record).await {
Ok(delivery_future) => match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => return Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => {
err = k_err;
// Add the future to the buffer
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
Ok(delivery_future) => {
// Push the future into the buffer
future_buffer.push(delivery_future);
push_flag = true;

// First see if the size is greater than the limit
if let Some(max_limit) = self.max_limit {
if future_buffer.len() > max_limit {
commit_flag = true;
break;
}
},
// Nothing to do here, since the err has already been set to
// KafkaError::Canceled. This represents the producer is dropped
// before the delivery status is received
Err(_) => break,
},
}
break;
}
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
Expand All @@ -441,32 +462,104 @@ impl KafkaSinkWriter {
}
}

if commit_flag {
// Give the buffer back to the origin
std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
future_buffer.is_empty(),
"future buffer must be empty after swapping"
);

match self.commit().await {
// FIXME: Is this error handling enough?
Ok(_) => return Ok(()),
Err(_) => return Err(err),
}
}

if push_flag {
// Indicates success
std::mem::swap(&mut future_buffer, &mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
future_buffer.is_empty(),
"future buffer must be empty after swapping"
);

return Ok(());
}

Err(err)
}

async fn write_json_objects(
&self,
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str())
.key(key_str.as_bytes());
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
async fn commit(&mut self) -> Result<()> {
// Get the ownership first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code can be simplified into the following

try_join_all(self.future_delivery_buffer.drain().map(|kafka_result| kafka_result.and_then(|kafka_result| kafka_result))).await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there are two possible flaws with this approach.

  1. We can not distinguish the error type (i.e., Whether the error is due to producer or the failure during the delivery)
  2. The and_then will produce Result<(i32, i64), (KafkaError, OwnedMessage)> which is essentially not a future to be awaited, and try_join_all will produce a Vec containing all the successful results, which also is not compatible with the original return type.

Thus I'd prefer the current logic at present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general idea of the code snippet above is to first turn the return type of the future from nested result Result<Result<(i32, i64), (KafkaError, OwnedMessage)>, Canceled> to Result<(i32, i64), SinkError>, and then do a try_join_all on the iterator of transformed future, which is the same as the current logic, but in a more elegant way.

The return type of future is changed by via map on the future. The and_then is applied inside the closure of map, which is turning the nested result into a plain result.

Copy link
Contributor Author

@xzhseh xzhseh Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess below is the simplification you'd prefer.
(Tried many times, this is the most elegant solution I could come up with)

async fn commit_inner(&mut self) -> Result<()> {
        let _v = try_join_all(
            self.future_delivery_buffer
                .drain(..)
                .map(|x| {
                    x.map(|f| {
                        match f {
                            Ok(Ok(val)) => Ok(val),
                            Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)),
                            Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)),
                        }
                    })
                })
        )
        .await?;

        Ok(())
    }

cc @tabVersion.
In addition, I'd prefer readability rather than elegant but hard-to-follow code snippet🤔️.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code snippet LGTM, and I think the code is intuitive and not hard-to-follow. All involved methods are commonly used library method.

try_join_all is a commonly used util method for rust async users, when we want to join some result future and either collect all ok result or stop when seeing any err. The current for-loop is a total reimplementation of it, or even a worse one, because if any future at the queue end completes with an err, we should stop polling other future at the queue front and return the error, but current for loop should wait for the completion of future at the queue front before seeing the err at the end. drain is also commonly used when we want to consume all items in a vector without taking the ownership of it.

For code readability, the code is hard to read in current code snippet since the parameter of closure is meaningless x and f. After we rename x to future and f to future_result, it will become much more readable.

One more thing we can improve in the code snippet above is that, since we don't care about value returned from the result, when we transform the return type of future, we'd better turn it into Ok(()). This is because the return type of try_join_all is a vector, and a vector of val involves unnecessary memory allocation. But () is zero-sized, and Vec<()> will not involve any memory allocation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use VecDeque to replace Vec ? I means that we only need to control the length of this queue. If the head of future in queue is ready, we can pull more request, rather than waiting all request in queue finished.

// The buffer will automatically become an empty vector
let delivery_futures = std::mem::take(&mut self.future_delivery_buffer);

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"The buffer must be empty"
);

// Commit all together
// FIXME: At present we could not retry, do we actually need to?
for delivery_future in delivery_futures {
match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => continue,
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => return Err(SinkError::Kafka(k_err)),
},
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => return Err(SinkError::Kafka(KafkaError::Canceled)),
}
}

Ok(())
}

async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the dbz_stream
let dbz_stream = gen_debezium_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
ts_ms,
DebeziumAdapterOpts::default(),
Expand All @@ -481,13 +574,13 @@ impl KafkaSinkWriter {
Ok(())
}

async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let upsert_stream = gen_upsert_message_stream(
&self.schema,
&self.pk_indices,
chunk,
UpsertAdapterOpts::default(),
);
async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
let schema = self.schema.clone();
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
let pk_indices = self.pk_indices.clone();

// Initialize the upsert_stream
let upsert_stream =
gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default());

#[for_await]
for msg in upsert_stream {
Expand All @@ -498,10 +591,14 @@ impl KafkaSinkWriter {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
AppendOnlyAdapterOpts::default(),
);
Expand Down Expand Up @@ -551,6 +648,8 @@ impl SinkWriterV1 for KafkaSinkWriter {
}

async fn commit(&mut self) -> Result<()> {
// Group delivery (await the `FutureRecord`) here
self.commit().await?;
Ok(())
}

Expand Down Expand Up @@ -691,7 +790,7 @@ mod test {
}

/// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
/// to run the test
/// to run the test, also remember to modify `risedev.yml`
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
Expand Down