Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kafka-sink): change sequential await to group await when committing #12013

Merged
merged 20 commits into from
Sep 13, 2023

Conversation

xzhseh
Copy link
Contributor

@xzhseh xzhseh commented Aug 31, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Change sequential await to group await when committing via coordinator.
The semantic is the same but more efficient, since whenever an error during delivery (the await on FutureRecord) happens, we'll rollback to the last checkpoint.
The current implementation simply appends event_key and event_value to the future_record_buffer, and try delivering (await) them all together when committing.
There is no intermediate state, and we can know immediately if the delivery succeeds or not when performing group commit.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xzhseh xzhseh requested review from hzxa21, tabVersion and wenym1 August 31, 2023 20:44
@xzhseh xzhseh self-assigned this Aug 31, 2023
@codecov
Copy link

codecov bot commented Aug 31, 2023

Codecov Report

Merging #12013 (2e07477) into main (228593d) will decrease coverage by 0.04%.
Report is 5 commits behind head on main.
The diff coverage is 0.00%.

@@            Coverage Diff             @@
##             main   #12013      +/-   ##
==========================================
- Coverage   69.69%   69.66%   -0.04%     
==========================================
  Files        1411     1411              
  Lines      236141   236177      +36     
==========================================
- Hits       164585   164531      -54     
- Misses      71556    71646      +90     
Flag Coverage Δ
rust 69.66% <0.00%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
src/connector/src/sink/kafka.rs 36.26% <0.00%> (-2.56%) ⬇️

... and 12 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 2, 2023

cc @wenym1, I've updated the version, and now the future_delivery_buffer stores DeliveryFuture instead of plain unsent data.
One thing to note is that now the max_limit is hard-coded to 10, b/c I examined SinkParam and somehow found it hard to add a new parameter from outside. Thus, this part may need future guidance.

src/connector/src/sink/kafka.rs Outdated Show resolved Hide resolved
src/connector/src/sink/kafka.rs Outdated Show resolved Hide resolved
src/connector/src/sink/kafka.rs Show resolved Hide resolved
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;

// First take the ownership of the exist buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is too complicated.

It can be as simple as

if self.future_buffer.len() == self.max_limit {
    // wait for some future(not all) in future_buffer to finish to release the quota.
} else {
    let future = self.send_result.inner(record);
    self.future_buffer.push(future);
}

For the step that waits for some future to finish, you can use VecDeque instead of Vec to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate on FuturesUnordered or FuturesOrdered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is:

  1. Check if the buffer is full; if so, await all the current futures (which is self.commit)
  2. Otherwise, push the future into the buffer and return.
  3. If the enqueue buffer is full, retry til the preset limit.

I think this is reasonable, since we've already wrapped the delivered process into commit per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.
Ultimately if ever encounters error, we will change the status back to the latest checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.

Besides, it is discouraged to call commit inside send_result, since the commit method is one of the methods in the SinkWriter trait, and its semantic is to be called on a externally triggered checkpoint, while in send_result this is the not the case. If we really want to reuse some logic, we can have a separate method named like ensure_delivery, and call it in both send_result and commit.

Also in current code I am confused of the usage of commit_flag and push_flag. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit I mentioned here is not actually the SinkWriter::commit, instead a wrapper function of KafkaSinkWriter::commit that will clear the current future_delivery_buffer and ensure delivery. (And it's the function discussed below, the one to change to try_join_all. The semantic of which is essentially the same as the ensure_delivery wrapper function mentioned above)
For the use of commit_flag and push_flag, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into the future_record_buffer, then exchange the local future_buffer and the one in KafkaSinkWriter using std::mem::swap, in order to make borrow checker happy.
And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.

Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.

Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.

self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
async fn commit(&mut self) -> Result<()> {
// Get the ownership first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code can be simplified into the following

try_join_all(self.future_delivery_buffer.drain().map(|kafka_result| kafka_result.and_then(|kafka_result| kafka_result))).await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there are two possible flaws with this approach.

  1. We can not distinguish the error type (i.e., Whether the error is due to producer or the failure during the delivery)
  2. The and_then will produce Result<(i32, i64), (KafkaError, OwnedMessage)> which is essentially not a future to be awaited, and try_join_all will produce a Vec containing all the successful results, which also is not compatible with the original return type.

Thus I'd prefer the current logic at present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general idea of the code snippet above is to first turn the return type of the future from nested result Result<Result<(i32, i64), (KafkaError, OwnedMessage)>, Canceled> to Result<(i32, i64), SinkError>, and then do a try_join_all on the iterator of transformed future, which is the same as the current logic, but in a more elegant way.

The return type of future is changed by via map on the future. The and_then is applied inside the closure of map, which is turning the nested result into a plain result.

Copy link
Contributor Author

@xzhseh xzhseh Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess below is the simplification you'd prefer.
(Tried many times, this is the most elegant solution I could come up with)

async fn commit_inner(&mut self) -> Result<()> {
        let _v = try_join_all(
            self.future_delivery_buffer
                .drain(..)
                .map(|x| {
                    x.map(|f| {
                        match f {
                            Ok(Ok(val)) => Ok(val),
                            Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)),
                            Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)),
                        }
                    })
                })
        )
        .await?;

        Ok(())
    }

cc @tabVersion.
In addition, I'd prefer readability rather than elegant but hard-to-follow code snippet🤔️.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code snippet LGTM, and I think the code is intuitive and not hard-to-follow. All involved methods are commonly used library method.

try_join_all is a commonly used util method for rust async users, when we want to join some result future and either collect all ok result or stop when seeing any err. The current for-loop is a total reimplementation of it, or even a worse one, because if any future at the queue end completes with an err, we should stop polling other future at the queue front and return the error, but current for loop should wait for the completion of future at the queue front before seeing the err at the end. drain is also commonly used when we want to consume all items in a vector without taking the ownership of it.

For code readability, the code is hard to read in current code snippet since the parameter of closure is meaningless x and f. After we rename x to future and f to future_result, it will become much more readable.

One more thing we can improve in the code snippet above is that, since we don't care about value returned from the result, when we transform the return type of future, we'd better turn it into Ok(()). This is because the return type of try_join_all is a vector, and a vector of val involves unnecessary memory allocation. But () is zero-sized, and Vec<()> will not involve any memory allocation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use VecDeque to replace Vec ? I means that we only need to control the length of this queue. If the head of future in queue is ready, we can pull more request, rather than waiting all request in queue finished.

@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 6, 2023

cc @wenym1 @Little-Wallace @tabVersion @hzxa21.
New version updated.

  1. Change Vec to VecDeque for better future handling in send_result.
  2. Use try_join_all for KafkaSinkWriter::commit_inner and add meaningful comments.
  3. Remove unnecessary logic and refactor some cases in send_result and only await front future in the current buffer queue when the buffer size exceeds the preset limit.

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Thanks for the update!

src/connector/src/sink/kafka.rs Outdated Show resolved Hide resolved
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while future_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code seems that we don't have to take self.future_delivery_buffer out and we can use self.delivery_future_buffer directly in where future_buffer is currently used. I am curious what error will the borrow checker report if we use it directly.

Copy link
Contributor Author

@xzhseh xzhseh Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this std::mem::take is used for the previous change 🤣, it's totally okay to directly use self.future_delivery_buffer with current implementation.

src/connector/src/sink/kafka.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4120 files.

Valid Invalid Ignored Fixed
1798 1 2321 0
Click to see the invalid file list
  • src/connector/src/parser/protobuf/recursive.rs

src/connector/src/parser/protobuf/recursive.rs Outdated Show resolved Hide resolved
@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 11, 2023

New version updated, I'm ready to merge if the overall picture looks good 🥳
cc @wenym1.

@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 11, 2023

Strange error when building by buildkite. It's okay when building locally with the latest version from main branch.
Has the madsim been changed 🤔️?
CleanShot 2023-09-11 at 17 21 05@2x

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Thanks for the effort!

KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
_ => break,
_ => {
ret = Err(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this error will always be overwritten by the later QueueFull. We may return here directly.

Copy link
Contributor Author

@xzhseh xzhseh Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer directly _ => return Err(e), here, so as to distinguish between QueueFull & other potential errors.
Then it will not be overwritten by the success_flag checking later.

src/connector/src/sink/kafka.rs Show resolved Hide resolved
src/connector/src/sink/kafka.rs Outdated Show resolved Hide resolved
@wenym1
Copy link
Contributor

wenym1 commented Sep 12, 2023

Strange error when building by buildkite. It's okay when building locally with the latest version from main branch.
Has the madsim been changed 🤔️?

Maybe the latest madsim use a different type to the official rdkafka. Actually we don't have to rewrite the whole result type. We only need to refer to the output type of the future. After changing the signature of may_future_result, it can compile in both cases.

fn map_future_result(
    delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {

@tabVersion
Copy link
Contributor

Strange error when building by buildkite. It's okay when building locally with the latest version from main branch.
Has the madsim been changed 🤔️?

@wangrunji0408 PTAL

@wangrunji0408
Copy link
Contributor

wangrunji0408 commented Sep 12, 2023

Strange error when building by buildkite. It's okay when building locally with the latest version from main branch.
Has the madsim been changed 🤔️?

Maybe the latest madsim use a different type to the official rdkafka. Actually we don't have to rewrite the whole result type. We only need to refer to the output type of the future. After changing the signature of may_future_result, it can compile in both cases.

fn map_future_result(
    delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {

Yes, I misused the tokio channel in madsim-rdkafka, which should be a futures channel. I'll fix that in the next version. The workaround mentioned above should also work.

UPDATE: fixed

@wangrunji0408 wangrunji0408 requested a review from a team as a code owner September 12, 2023 07:31
@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 13, 2023

cc @wenym1, I'm ready to merge if the current implementation looks good.

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just push a commit to update the max queue size from 10 to 65536. 10 seems too small.

@hzxa21
Copy link
Collaborator

hzxa21 commented Sep 13, 2023

https://github.com/confluentinc/librdkafka/blob/49f180a36c247100dc246379ace84833b91f4038/CONFIGURATION.md?plain=1#L149

Based on the rdkafka documentation, the configs that can affect production batching is:

  • batch.num.messages. default = 10k
  • batch.size. default = 1MB
  • linger.ms. default = 5ms

This means producer will send a batch of producer records when one of the following condition is met:

  1. There are 10k messages enqueued to send for a partition via producer.send
  2. There are 1MB messages enqueued to send for a partition via producer.send
  3. A message has been enqueued for more than 5ms before 1 and 2 are met via producer.send .

After an offline discussion with @wenym1, we think it is better to increase the max queue size introduced in this PR to 65536 to achieve better kafka producer batching.

@wenym1 wenym1 added this pull request to the merge queue Sep 13, 2023
Merged via the queue into main with commit de1f776 Sep 13, 2023
7 of 8 checks passed
@wenym1 wenym1 deleted the xzhseh/feat-kafka-sink-group-commit branch September 13, 2023 04:59
wenym1 added a commit that referenced this pull request Sep 13, 2023
Li0k pushed a commit that referenced this pull request Sep 15, 2023
…mitting (#12013)

Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: Runji Wang <[email protected]>
Co-authored-by: William Wen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants