Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(kafka-sink): change sequential await to group await when committing #12013
refactor(kafka-sink): change sequential await to group await when committing #12013
Changes from 7 commits
a004bd9
c01698f
96c0745
5121fb9
bd68e17
65013df
11ba5f3
eb68ee4
250520a
76f95e4
b6a72c2
7339913
188da6a
916ebd6
77aa7c2
9de00bc
9d03f67
1cb4694
b08670f
2e07477
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is too complicated.
It can be as simple as
For the step that waits for some future to finish, you can use
VecDeque
instead ofVec
to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate onFuturesUnordered
orFuturesOrdered
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach is:
self.commit
)I think this is reasonable, since we've already wrapped the delivered process into
commit
per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.Ultimately if ever encounters error, we will change the status back to the latest checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.
Besides, it is discouraged to call
commit
insidesend_result
, since thecommit
method is one of the methods in theSinkWriter
trait, and its semantic is to be called on a externally triggered checkpoint, while insend_result
this is the not the case. If we really want to reuse some logic, we can have a separate method named likeensure_delivery
, and call it in bothsend_result
andcommit
.Also in current code I am confused of the usage of
commit_flag
andpush_flag
. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
commit
I mentioned here is not actually theSinkWriter::commit
, instead a wrapper function ofKafkaSinkWriter::commit
that will clear the currentfuture_delivery_buffer
and ensure delivery. (And it's the function discussed below, the one to change totry_join_all
. The semantic of which is essentially the same as theensure_delivery
wrapper function mentioned above)For the use of
commit_flag
andpush_flag
, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into thefuture_record_buffer
, then exchange the localfuture_buffer
and the one inKafkaSinkWriter
usingstd::mem::swap
, in order to make borrow checker happy.And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.
Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.
Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code can be simplified into the following
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, there are two possible flaws with this approach.
and_then
will produceResult<(i32, i64), (KafkaError, OwnedMessage)>
which is essentially not a future to be awaited, andtry_join_all
will produce aVec
containing all the successful results, which also is not compatible with the original return type.Thus I'd prefer the current logic at present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general idea of the code snippet above is to first turn the return type of the future from nested result
Result<Result<(i32, i64), (KafkaError, OwnedMessage)>, Canceled>
toResult<(i32, i64), SinkError>
, and then do atry_join_all
on the iterator of transformed future, which is the same as the current logic, but in a more elegant way.The return type of future is changed by via
map
on the future. Theand_then
is applied inside the closure ofmap
, which is turning the nested result into a plain result.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess below is the simplification you'd prefer.
(Tried many times, this is the most elegant solution I could come up with)
cc @tabVersion.
In addition, I'd prefer readability rather than elegant but hard-to-follow code snippet🤔️.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code snippet LGTM, and I think the code is intuitive and not hard-to-follow. All involved methods are commonly used library method.
try_join_all
is a commonly used util method for rust async users, when we want to join some result future and either collect all ok result or stop when seeing any err. The current for-loop is a total reimplementation of it, or even a worse one, because if any future at the queue end completes with an err, we should stop polling other future at the queue front and return the error, but current for loop should wait for the completion of future at the queue front before seeing the err at the end.drain
is also commonly used when we want to consume all items in a vector without taking the ownership of it.For code readability, the code is hard to read in current code snippet since the parameter of closure is meaningless
x
andf
. After we renamex
tofuture
andf
tofuture_result
, it will become much more readable.One more thing we can improve in the code snippet above is that, since we don't care about value returned from the result, when we transform the return type of future, we'd better turn it into
Ok(())
. This is because the return type oftry_join_all
is a vector, and a vector ofval
involves unnecessary memory allocation. But()
is zero-sized, andVec<()>
will not involve any memory allocation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use
VecDeque
to replaceVec
? I means that we only need to control the length of this queue. If the head of future in queue is ready, we can pull more request, rather than waiting all request in queue finished.