Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(kafka-sink): change sequential await to group await when committing #12013
refactor(kafka-sink): change sequential await to group await when committing #12013
Changes from 11 commits
a004bd9
c01698f
96c0745
5121fb9
bd68e17
65013df
11ba5f3
eb68ee4
250520a
76f95e4
b6a72c2
7339913
188da6a
916ebd6
77aa7c2
9de00bc
9d03f67
1cb4694
b08670f
2e07477
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is too complicated.
It can be as simple as
For the step that waits for some future to finish, you can use
VecDeque
instead ofVec
to buffer the future, and always push to the back, and pop from front and then await on the future at the queue head. You can also investigate onFuturesUnordered
orFuturesOrdered
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach is:
self.commit
)I think this is reasonable, since we've already wrapped the delivered process into
commit
per 10 futures, and based on the current 10 futures limit, I think it's also efficient just to wait the 10 buffered futures to be delivered and then begin the next turn.Ultimately if ever encounters error, we will change the status back to the latest checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic makes some sense when the limit is 10, a relatively small value. If we decide to change to a larger values in the future, such 65536, it makes no sense to still wait for all futures to finish.
Besides, it is discouraged to call
commit
insidesend_result
, since thecommit
method is one of the methods in theSinkWriter
trait, and its semantic is to be called on a externally triggered checkpoint, while insend_result
this is the not the case. If we really want to reuse some logic, we can have a separate method named likeensure_delivery
, and call it in bothsend_result
andcommit
.Also in current code I am confused of the usage of
commit_flag
andpush_flag
. It seems that the logic can be implemented without taking out self.future_delivery_buffer and using the two flags.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
commit
I mentioned here is not actually theSinkWriter::commit
, instead a wrapper function ofKafkaSinkWriter::commit
that will clear the currentfuture_delivery_buffer
and ensure delivery. (And it's the function discussed below, the one to change totry_join_all
. The semantic of which is essentially the same as theensure_delivery
wrapper function mentioned above)For the use of
commit_flag
andpush_flag
, the former one is to check if the size is at the limit, the latter one is to check if this call has successfully sent the data and push the future into thefuture_record_buffer
, then exchange the localfuture_buffer
and the one inKafkaSinkWriter
usingstd::mem::swap
, in order to make borrow checker happy.And yes, for a rather large buffer value such as 65536, we could actually adapt to a non-blocking approach.
I've tried implementing the non-blocking logic but it seems tricky and ambiguous (i.e., To wait for the release of quota, we will typically await the future at the front of the deque, but should we launch a background thread as the consumer to keep awaiting whenever the buffer is not empty, or some other approach to consume the buffer?), so may be future reviewed & discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need any non-blocking approach? The reason for non-blocking approach is that while waiting for the message being delivered, we have something else to do and we don't want to be blocked. However, when the buffer is full, we have nothing else to do but only wait for the release of quota. If so, we only need to take the future at the queue front, and simply await on it.
Actually, the current functionality provided by the rdkafka client is already non-blocking, whose implementation allows adding records to its inner queue and asynchronously delivering message to external kafka, and at the end notifying the delivery. We don't need to implement any extra non-blocking logic.
Besides, the two flags can be avoided. We can in-place update any states in where the flags are set now, instead of setting flags and applied the update afterward. If the borrow checker reports any error, we can resolve it together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code seems that we don't have to take
self.future_delivery_buffer
out and we can useself.delivery_future_buffer
directly in wherefuture_buffer
is currently used. I am curious what error will the borrow checker report if we use it directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this
std::mem::take
is used for the previous change 🤣, it's totally okay to directly useself.future_delivery_buffer
with current implementation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this error will always be overwritten by the later
QueueFull
. We may return here directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer directly
_ => return Err(e),
here, so as to distinguish betweenQueueFull
& other potential errors.Then it will not be overwritten by the
success_flag
checking later.