diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java index 0f70fb3d6983..661ba351dfc2 100644 --- a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java +++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.http.client.response.ClientResponse; @@ -49,11 +50,20 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler response( return ClientResponse.finished(clientResponseObj); } - final byte[] chunk = new byte[content.readableBytes()]; - content.getBytes(content.readerIndex(), chunk); + final byte[] chunk; + final int chunkSize = content.readableBytes(); - try { - final ListenableFuture backpressureFuture = channel.addChunk(chunk); + // Potentially skip some of this chunk, if the relevant bytes have already been read by the handler. This can + // happen if a request reads some data, then fails with a retryable I/O error, and then is retried. The retry + // will re-read some data that has already been added to the channel, so we need to skip it. + final long readByThisHandler = channel.getBytesAdded() - startOffset; + final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior to the current chunk + final long toSkip = readByThisHandler - readByThisRequest; - if (backpressureFuture != null) { - clientResponseObj.setBackpressureFuture(backpressureFuture); - } + if (toSkip < 0) { + throw DruidException.defensive("Expected toSkip[%d] to be nonnegative", toSkip); + } else if (toSkip < chunkSize) { // When toSkip >= chunkSize, we skip the entire chunk and do not toucn the channel + chunk = new byte[chunkSize - (int) toSkip]; + content.getBytes(content.readerIndex() + (int) toSkip, chunk); - clientResponseObj.addBytesRead(chunk.length); - } - catch (Exception e) { - clientResponseObj.exceptionCaught(e); + try { + final ListenableFuture backpressureFuture = channel.addChunk(chunk); + + if (backpressureFuture != null) { + clientResponseObj.setBackpressureFuture(backpressureFuture); + } + } + catch (Exception e) { + clientResponseObj.exceptionCaught(e); + } } + // Call addBytesRead even if we skipped some or all of the chunk, because that lets us know when to stop skipping. + clientResponseObj.addBytesRead(chunkSize); return ClientResponse.unfinished(clientResponseObj); } } diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java index 8c2056dcbe43..9e6b84c6bbf7 100644 --- a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java +++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java @@ -74,6 +74,14 @@ public boolean isExceptionCaught() return exceptionCaught != null; } + /** + * Number of bytes read so far by this request. + */ + public long getBytesRead() + { + return bytesRead; + } + /** * Future that resolves when it is a good time to request the next chunk of the frame file. * @@ -105,6 +113,9 @@ void exceptionCaught(final Throwable t) } } + /** + * Increment the value returned by {@link #getBytesRead()}. Called whenever a chunk of data is read from the response. + */ void addBytesRead(final long n) { bytesRead += n; diff --git a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java index 4eeaaddbe892..06c160e68409 100644 --- a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java +++ b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java @@ -346,6 +346,86 @@ public void testCaughtExceptionDuringChunkedResponse() throws Exception ); } + @Test + public void testCaughtExceptionDuringChunkedResponseRetryWithSameHandler() throws Exception + { + // Split file into 12 chunks after the first 100 bytes. + final int firstPart = 100; + final int chunkSize = Ints.checkedCast(LongMath.divide(file.length() - firstPart, 12, RoundingMode.CEILING)); + final byte[] allBytes = Files.readAllBytes(file.toPath()); + + // Add firstPart and be done. + ClientResponse response = handler.done( + handler.handleResponse( + makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, 0, firstPart)), + null + ) + ); + + Assert.assertEquals(firstPart, channel.getBytesAdded()); + Assert.assertTrue(response.isFinished()); + + // Add first quarter after firstPart using a new handler. + handler = new FrameFileHttpResponseHandler(channel); + response = handler.handleResponse( + makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 3)), + null + ); + + // Set an exception. + handler.exceptionCaught(response, new ISE("Oh no!")); + + // Add another chunk after the exception is caught (this can happen in real life!). We expect it to be ignored. + response = handler.handleChunk( + response, + makeChunk(byteSlice(allBytes, firstPart + chunkSize * 3, chunkSize * 3)), + 2 + ); + + // Verify that the exception handler was called. + Assert.assertTrue(response.getObj().isExceptionCaught()); + final Throwable e = response.getObj().getExceptionCaught(); + MatcherAssert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Oh no!"))); + + // Retry connection with the same handler and same initial offset firstPart (don't recreate handler), but now use + // thirds instead of quarters as chunks. (ServiceClientImpl would retry from the same offset with the same handler + // if the exception is retryable.) + response = handler.handleResponse( + makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 4)), + null + ); + + Assert.assertEquals(firstPart + chunkSize * 4L, channel.getBytesAdded()); + Assert.assertFalse(response.isFinished()); + + // Send the rest of the data. + response = handler.handleChunk( + response, + makeChunk(byteSlice(allBytes, firstPart + chunkSize * 4, chunkSize * 4)), + 1 + ); + Assert.assertEquals(firstPart + chunkSize * 8L, channel.getBytesAdded()); + + response = handler.handleChunk( + response, + makeChunk(byteSlice(allBytes, firstPart + chunkSize * 8, chunkSize * 4)), + 2 + ); + response = handler.done(response); + + Assert.assertTrue(response.isFinished()); + Assert.assertFalse(response.getObj().isExceptionCaught()); + + // Verify channel. + Assert.assertEquals(allBytes.length, channel.getBytesAdded()); + channel.doneWriting(); + FrameTestUtil.assertRowsEqual( + FrameTestUtil.readRowsFromAdapter(adapter, null, false), + FrameTestUtil.readRowsFromFrameChannel(channel, FrameReader.create(adapter.getRowSignature())) + ); + } + private static HttpResponse makeResponse(final HttpResponseStatus status, final byte[] content) { final ByteBufferBackedChannelBuffer channelBuffer = new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(content));