-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(kafka-sink): change sequential await to group await when committing #12013
Conversation
Codecov Report
@@ Coverage Diff @@
## main #12013 +/- ##
==========================================
- Coverage 69.69% 69.66% -0.04%
==========================================
Files 1411 1411
Lines 236141 236177 +36
==========================================
- Hits 164585 164531 -54
- Misses 71556 71646 +90
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 12 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
cc @wenym1, I've updated the version, and now the |
src/connector/src/sink/kafka.rs
Outdated
where | ||
K: ToBytes + ?Sized, | ||
P: ToBytes + ?Sized, | ||
{ | ||
// The error to be returned | ||
let mut err = KafkaError::Canceled; | ||
|
||
// First take the ownership of the exist buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is too complicated.
It can be as simple as
if self.future_buffer.len() == self.max_limit {
// wait for some future(not all) in future_buffer to finish to release the quota.
} else {
let future = self.send_result.inner(record);
self.future_buffer.push(future);
}
For the step that waits for some future to finish, you can use VecDeque
instead of Vec
to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate on FuturesUnordered
or FuturesOrdered
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach is:
- Check if the buffer is full; if so, await all the current futures (which is
self.commit
) - Otherwise, push the future into the buffer and return.
- If the enqueue buffer is full, retry til the preset limit.
I think this is reasonable, since we've already wrapped the delivered process into commit
per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.
Ultimately if ever encounters error, we will change the status back to the latest checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.
Besides, it is discouraged to call commit
inside send_result
, since the commit
method is one of the methods in the SinkWriter
trait, and its semantic is to be called on a externally triggered checkpoint, while in send_result
this is the not the case. If we really want to reuse some logic, we can have a separate method named like ensure_delivery
, and call it in both send_result
and commit
.
Also in current code I am confused of the usage of commit_flag
and push_flag
. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit
I mentioned here is not actually the SinkWriter::commit
, instead a wrapper function of KafkaSinkWriter::commit
that will clear the current future_delivery_buffer
and ensure delivery. (And it's the function discussed below, the one to change to try_join_all
. The semantic of which is essentially the same as the ensure_delivery
wrapper function mentioned above)
For the use of commit_flag
and push_flag
, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into the future_record_buffer
, then exchange the local future_buffer
and the one in KafkaSinkWriter
using std::mem::swap
, in order to make borrow checker happy.
And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.
Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.
Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.
src/connector/src/sink/kafka.rs
Outdated
self.send_result(record).await?; | ||
Ok(()) | ||
} | ||
|
||
async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { | ||
async fn commit(&mut self) -> Result<()> { | ||
// Get the ownership first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code can be simplified into the following
try_join_all(self.future_delivery_buffer.drain().map(|kafka_result| kafka_result.and_then(|kafka_result| kafka_result))).await?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, there are two possible flaws with this approach.
- We can not distinguish the error type (i.e., Whether the error is due to producer or the failure during the delivery)
- The
and_then
will produceResult<(i32, i64), (KafkaError, OwnedMessage)>
which is essentially not a future to be awaited, andtry_join_all
will produce aVec
containing all the successful results, which also is not compatible with the original return type.
Thus I'd prefer the current logic at present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general idea of the code snippet above is to first turn the return type of the future from nested result Result<Result<(i32, i64), (KafkaError, OwnedMessage)>, Canceled>
to Result<(i32, i64), SinkError>
, and then do a try_join_all
on the iterator of transformed future, which is the same as the current logic, but in a more elegant way.
The return type of future is changed by via map
on the future. The and_then
is applied inside the closure of map
, which is turning the nested result into a plain result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess below is the simplification you'd prefer.
(Tried many times, this is the most elegant solution I could come up with)
async fn commit_inner(&mut self) -> Result<()> {
let _v = try_join_all(
self.future_delivery_buffer
.drain(..)
.map(|x| {
x.map(|f| {
match f {
Ok(Ok(val)) => Ok(val),
Ok(Err((k_err, _msg))) => Err(SinkError::Kafka(k_err)),
Err(_) => Err(SinkError::Kafka(KafkaError::Canceled)),
}
})
})
)
.await?;
Ok(())
}
cc @tabVersion.
In addition, I'd prefer readability rather than elegant but hard-to-follow code snippet🤔️.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code snippet LGTM, and I think the code is intuitive and not hard-to-follow. All involved methods are commonly used library method.
try_join_all
is a commonly used util method for rust async users, when we want to join some result future and either collect all ok result or stop when seeing any err. The current for-loop is a total reimplementation of it, or even a worse one, because if any future at the queue end completes with an err, we should stop polling other future at the queue front and return the error, but current for loop should wait for the completion of future at the queue front before seeing the err at the end. drain
is also commonly used when we want to consume all items in a vector without taking the ownership of it.
For code readability, the code is hard to read in current code snippet since the parameter of closure is meaningless x
and f
. After we rename x
to future
and f
to future_result
, it will become much more readable.
One more thing we can improve in the code snippet above is that, since we don't care about value returned from the result, when we transform the return type of future, we'd better turn it into Ok(())
. This is because the return type of try_join_all
is a vector, and a vector of val
involves unnecessary memory allocation. But ()
is zero-sized, and Vec<()>
will not involve any memory allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use VecDeque
to replace Vec
? I means that we only need to control the length of this queue. If the head of future in queue is ready, we can pull more request, rather than waiting all request in queue finished.
cc @wenym1 @Little-Wallace @tabVersion @hzxa21.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Thanks for the update!
src/connector/src/sink/kafka.rs
Outdated
Ok(delivery_future) => { | ||
// First check if the current length is | ||
// greater than the preset limit | ||
while future_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code seems that we don't have to take self.future_delivery_buffer
out and we can use self.delivery_future_buffer
directly in where future_buffer
is currently used. I am curious what error will the borrow checker report if we use it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this std::mem::take
is used for the previous change 🤣, it's totally okay to directly use self.future_delivery_buffer
with current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4120 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
1798 | 1 | 2321 | 0 |
Click to see the invalid file list
- src/connector/src/parser/protobuf/recursive.rs
New version updated, I'm ready to merge if the overall picture looks good 🥳 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Thanks for the effort!
src/connector/src/sink/kafka.rs
Outdated
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { | ||
tokio::time::sleep(self.config.retry_interval).await; | ||
continue; | ||
} | ||
_ => break, | ||
_ => { | ||
ret = Err(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this error will always be overwritten by the later QueueFull
. We may return here directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer directly _ => return Err(e),
here, so as to distinguish between QueueFull
& other potential errors.
Then it will not be overwritten by the success_flag
checking later.
Maybe the latest madsim use a different type to the official rdkafka. Actually we don't have to rewrite the whole result type. We only need to refer to the output type of the future. After changing the signature of
|
@wangrunji0408 PTAL |
Yes, I misused the tokio channel in madsim-rdkafka, which should be a futures channel. I'll fix that in the next version. The workaround mentioned above should also work. UPDATE: fixed |
Signed-off-by: Runji Wang <[email protected]>
cc @wenym1, I'm ready to merge if the current implementation looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just push a commit to update the max queue size from 10 to 65536. 10 seems too small.
Based on the rdkafka documentation, the configs that can affect production batching is:
This means producer will send a batch of producer records when one of the following condition is met:
After an offline discussion with @wenym1, we think it is better to increase the max queue size introduced in this PR to 65536 to achieve better kafka producer batching. |
…mitting (#12013) Signed-off-by: Runji Wang <[email protected]> Co-authored-by: Runji Wang <[email protected]> Co-authored-by: William Wen <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Change sequential await to group await when committing via coordinator.
The semantic is the same but more efficient, since whenever an error during delivery (the await on
FutureRecord
) happens, we'll rollback to the last checkpoint.The current implementation simply appends
event_key
andevent_value
to thefuture_record_buffer
, and try delivering (await) them all together when committing.There is no intermediate state, and we can know immediately if the delivery succeeds or not when performing group commit.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.