-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix IndexerWorkerClient#fetchChannelData when response has data and error. #15084
Conversation
…rror. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
if (backpressureFuture != null) { | ||
clientResponseObj.setBackpressureFuture(backpressureFuture); | ||
} | ||
clientResponseObj.addBytesRead(chunkSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would expect to add bytes read at the end before returning the clientResponseObject.
More of a readability thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to the line before the return
.
} | ||
catch (Exception e) { | ||
clientResponseObj.exceptionCaught(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: toSkip> chunk size since we have already have that data in the channel, so we basically do nothing. Can we add an explicit else and mention that in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The else
would have en empty body, so instead of adding that, I just added a comment next to the else if
.
if (backpressureFuture != null) { | ||
clientResponseObj.setBackpressureFuture(backpressureFuture); | ||
} | ||
clientResponseObj.addBytesRead(chunkSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the skip case we are not basically reading all the bytes. We are skipping to chunkSize - (int) toSkip]
. So does adding in all the bytes is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, since "bytes read" tracks how many bytes have been read from the response, not how many have been written to the channel. Its purpose is to let us know when to stop skipping (we keep incrementing it until it passes channel.getBytesAdded() - startOffset
). I added a comment about this.
Thanks @gianm for the clarifying comments. !! |
…rror. (apache#15084) * Fix IndexerWorkerClient#fetchChannelData when response has data and error. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
…rror. (apache#15084) * Fix IndexerWorkerClient#fetchChannelData when response has data and error. When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.
When a channel data response from a worker includes some data and then some I/O error, then when the call is retried, we will re-read the set of data that was read by the previous connection and add it to the local channel again. This causes the local channel to become corrupted. The patch fixes this case by skipping data that has already been read.