Skip to content

Commit

Permalink
Fix IndexerWorkerClient#fetchChannelData when response has data and e…
Browse files Browse the repository at this point in the history
…rror. (apache#15084)

* Fix IndexerWorkerClient#fetchChannelData when response has data and error.

When a channel data response from a worker includes some data and then
some I/O error, then when the call is retried, we will re-read the set
of data that was read by the previous connection and add it to the
local channel again. This causes the local channel to become corrupted.
The patch fixes this case by skipping data that has already been read.
  • Loading branch information
gianm authored and ektravel committed Oct 16, 2023
1 parent 621b7a6 commit 183c70c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.http.client.response.ClientResponse;
Expand Down Expand Up @@ -49,11 +50,20 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFi
public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
public static final String HEADER_LAST_FETCH_VALUE = "yes";

/**
* Channel to write to.
*/
private final ReadableByteChunksFrameChannel channel;

/**
* Starting offset for this handler.
*/
private final long startOffset;

public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
{
this.channel = Preconditions.checkNotNull(channel, "channel");
this.startOffset = channel.getBytesAdded();
}

@Override
Expand Down Expand Up @@ -114,22 +124,36 @@ private ClientResponse<FrameFilePartialFetch> response(
return ClientResponse.finished(clientResponseObj);
}

final byte[] chunk = new byte[content.readableBytes()];
content.getBytes(content.readerIndex(), chunk);
final byte[] chunk;
final int chunkSize = content.readableBytes();

try {
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
// Potentially skip some of this chunk, if the relevant bytes have already been read by the handler. This can
// happen if a request reads some data, then fails with a retryable I/O error, and then is retried. The retry
// will re-read some data that has already been added to the channel, so we need to skip it.
final long readByThisHandler = channel.getBytesAdded() - startOffset;
final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior to the current chunk
final long toSkip = readByThisHandler - readByThisRequest;

if (backpressureFuture != null) {
clientResponseObj.setBackpressureFuture(backpressureFuture);
}
if (toSkip < 0) {
throw DruidException.defensive("Expected toSkip[%d] to be nonnegative", toSkip);
} else if (toSkip < chunkSize) { // When toSkip >= chunkSize, we skip the entire chunk and do not toucn the channel
chunk = new byte[chunkSize - (int) toSkip];
content.getBytes(content.readerIndex() + (int) toSkip, chunk);

clientResponseObj.addBytesRead(chunk.length);
}
catch (Exception e) {
clientResponseObj.exceptionCaught(e);
try {
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);

if (backpressureFuture != null) {
clientResponseObj.setBackpressureFuture(backpressureFuture);
}
}
catch (Exception e) {
clientResponseObj.exceptionCaught(e);
}
}

// Call addBytesRead even if we skipped some or all of the chunk, because that lets us know when to stop skipping.
clientResponseObj.addBytesRead(chunkSize);
return ClientResponse.unfinished(clientResponseObj);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public boolean isExceptionCaught()
return exceptionCaught != null;
}

/**
* Number of bytes read so far by this request.
*/
public long getBytesRead()
{
return bytesRead;
}

/**
* Future that resolves when it is a good time to request the next chunk of the frame file.
*
Expand Down Expand Up @@ -105,6 +113,9 @@ void exceptionCaught(final Throwable t)
}
}

/**
* Increment the value returned by {@link #getBytesRead()}. Called whenever a chunk of data is read from the response.
*/
void addBytesRead(final long n)
{
bytesRead += n;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,86 @@ public void testCaughtExceptionDuringChunkedResponse() throws Exception
);
}

@Test
public void testCaughtExceptionDuringChunkedResponseRetryWithSameHandler() throws Exception
{
// Split file into 12 chunks after the first 100 bytes.
final int firstPart = 100;
final int chunkSize = Ints.checkedCast(LongMath.divide(file.length() - firstPart, 12, RoundingMode.CEILING));
final byte[] allBytes = Files.readAllBytes(file.toPath());

// Add firstPart and be done.
ClientResponse<FrameFilePartialFetch> response = handler.done(
handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, 0, firstPart)),
null
)
);

Assert.assertEquals(firstPart, channel.getBytesAdded());
Assert.assertTrue(response.isFinished());

// Add first quarter after firstPart using a new handler.
handler = new FrameFileHttpResponseHandler(channel);
response = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 3)),
null
);

// Set an exception.
handler.exceptionCaught(response, new ISE("Oh no!"));

// Add another chunk after the exception is caught (this can happen in real life!). We expect it to be ignored.
response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 3, chunkSize * 3)),
2
);

// Verify that the exception handler was called.
Assert.assertTrue(response.getObj().isExceptionCaught());
final Throwable e = response.getObj().getExceptionCaught();
MatcherAssert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class));
MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Oh no!")));

// Retry connection with the same handler and same initial offset firstPart (don't recreate handler), but now use
// thirds instead of quarters as chunks. (ServiceClientImpl would retry from the same offset with the same handler
// if the exception is retryable.)
response = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 4)),
null
);

Assert.assertEquals(firstPart + chunkSize * 4L, channel.getBytesAdded());
Assert.assertFalse(response.isFinished());

// Send the rest of the data.
response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 4, chunkSize * 4)),
1
);
Assert.assertEquals(firstPart + chunkSize * 8L, channel.getBytesAdded());

response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 8, chunkSize * 4)),
2
);
response = handler.done(response);

Assert.assertTrue(response.isFinished());
Assert.assertFalse(response.getObj().isExceptionCaught());

// Verify channel.
Assert.assertEquals(allBytes.length, channel.getBytesAdded());
channel.doneWriting();
FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromAdapter(adapter, null, false),
FrameTestUtil.readRowsFromFrameChannel(channel, FrameReader.create(adapter.getRowSignature()))
);
}

private static HttpResponse makeResponse(final HttpResponseStatus status, final byte[] content)
{
final ByteBufferBackedChannelBuffer channelBuffer = new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(content));
Expand Down

0 comments on commit 183c70c

Please sign in to comment.