Skip to content

Commit

Permalink
reintroduce timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Nov 25, 2024
1 parent 463ed03 commit 8a95412
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -53,38 +54,52 @@ enum State {

private static final Logger LOG = LogManager.getLogger();

@VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10);
@VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30);

private final AsyncRunner asyncRunner;
private final int maximumResponseChunks;
private final Eth2RpcResponseHandler<TResponse, ?> responseHandler;
private final ResponseStream<TResponse> responseStream;

private final AsyncRunner timeoutRunner;
private final AtomicInteger currentChunkCount = new AtomicInteger(0);
private final AtomicReference<State> state;
private final AtomicReference<AsyncResponseProcessor<TResponse>> responseProcessor =
new AtomicReference<>();

private final String protocolId;
private final RpcResponseDecoder<TResponse, ?> responseDecoder;
private final boolean shouldReceiveResponse;

public Eth2OutgoingRequestHandler(
final AsyncRunner asyncRunner,
final AsyncRunner timeoutRunner,
final String protocolId,
final RpcResponseDecoder<TResponse, ?> responseDecoder,
final boolean shouldReceiveResponse,
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
this.asyncRunner = asyncRunner;
this.timeoutRunner = timeoutRunner;
this.maximumResponseChunks = request.getMaximumResponseChunks();
this.responseHandler = responseHandler;
responseStream = new ResponseStream<>(responseHandler);
this.responseDecoder = responseDecoder;
this.shouldReceiveResponse = shouldReceiveResponse;
this.protocolId = protocolId;
this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED);
}

public void handleInitialPayloadSent(final RpcStream stream) {
// Close the write side of the stream
stream.closeWriteStream().ifExceptionGetsHereRaiseABug();
if (shouldReceiveResponse) {
// Setup initial chunk timeout
ensureNextResponseChunkArrivesInTime(stream, currentChunkCount.get(), currentChunkCount);
} else {
ensureReadCompleteArrivesInTime(stream);
}
}

@Override
Expand All @@ -111,9 +126,16 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
for (TResponse maybeResponse : maybeResponses) {
getResponseProcessor(rpcStream).processResponse(maybeResponse);
}
if (chunksReceived >= maximumResponseChunks
&& !transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
if (chunksReceived < maximumResponseChunks) {
if (!maybeResponses.isEmpty()) {
ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount);
}
} else {
if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
return;
}
ensureReadCompleteArrivesInTime(rpcStream);
}
} catch (final RpcException e) {
abortRequest(rpcStream, e);
Expand Down Expand Up @@ -228,6 +250,40 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina
}
}

private void ensureNextResponseChunkArrivesInTime(
final RpcStream stream,
final int previousResponseCount,
final AtomicInteger currentResponseCount) {
timeoutRunner
.getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT)
.thenAccept(
(__) -> {
if (previousResponseCount == currentResponseCount.get()) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for response chunk " + previousResponseCount,
RESPONSE_CHUNK_ARRIVAL_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

private void ensureReadCompleteArrivesInTime(final RpcStream stream) {
timeoutRunner
.getDelayedFuture(READ_COMPLETE_TIMEOUT)
.thenAccept(
(__) -> {
if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

@VisibleForTesting
State getState() {
return state.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHand
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
return new Eth2OutgoingRequestHandler<>(
asyncRunner,
asyncRunner,
protocolId,
createResponseDecoder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.READ_COMPLETE_TIMEOUT;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.RESPONSE_CHUNK_ARRIVAL_TIMEOUT;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -95,6 +97,7 @@ public void setup() {
getRpcEncoding(), spec.getGenesisSchemaDefinitions().getSignedBeaconBlockSchema());
return new Eth2OutgoingRequestHandler<>(
asyncRequestRunner,
timeoutRunner,
method.getIds().get(0),
responseDecoder,
method.shouldReceiveResponse(),
Expand Down Expand Up @@ -261,6 +264,61 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception {
verify(rpcStream, never()).closeAbruptly();
}

@Test
public void disconnectsIfFirstChunkIsNotReceivedInTime() {
sendInitialPayload();

// Run timeouts
timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void doNotDisconnectsIfFirstChunkReceivedInTime() {
sendInitialPayload();

deliverChunk(0);

// Go past the time the first chunk would have timed out but not enough to trigger timeout on
// the second chunk and ensure the timeout never fires.
timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
verify(rpcStream, never()).closeAbruptly();
}

@Test
public void disconnectsIfSecondChunkNotReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Run timeouts
timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void abortsWhenNoReadComplete() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
for (int i = 0; i < maxChunks; i++) {
deliverChunk(i);
}

asyncRequestRunner.executeQueuedActions();

// Run timeouts
timeProvider.advanceTimeByMillis(READ_COMPLETE_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
sendInitialPayload();
Expand All @@ -276,6 +334,29 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
assertThat(finishedProcessingFuture).isCompletedExceptionally();
}

@Test
public void doNotDisconnectsIfSecondChunkReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Second chunk is received just in time
timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
deliverChunk(1);
asyncRequestRunner.executeQueuedActions();

// Go past the time the second chunk would have timed out but not enough to trigger timeout on
// the third chunk and ensure the timeout never fires.
timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
verify(rpcStream, never()).closeAbruptly();
assertThat(blocks.size()).isEqualTo(2);
}

@Test
public void shouldWorkWhenInitialPayloadEventIsLate() throws Exception {
deliverChunk(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ public class RpcHandler<
TRequest,
TRespHandler extends RpcResponseHandler<?>>
implements ProtocolBinding<Controller<TOutgoingHandler>> {
private static final Duration TIMEOUT = Duration.ofSeconds(5);
private static final Logger LOG = LogManager.getLogger();

private static final Duration STREAM_INITIALIZE_TIMEOUT = Duration.ofSeconds(5);

private final AsyncRunner asyncRunner;
private final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod;
private final ThrottlingTaskQueue concurrentRequestsQueue;
Expand Down Expand Up @@ -97,7 +98,7 @@ public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequestInternal(
SafeFuture.createInterruptor(connection.closeFuture(), PeerDisconnectedException::new);
final Interruptor timeoutInterruptor =
SafeFuture.createInterruptor(
asyncRunner.getDelayedFuture(TIMEOUT),
asyncRunner.getDelayedFuture(STREAM_INITIALIZE_TIMEOUT),
() ->
new StreamTimeoutException(
"Timed out waiting to initialize stream for protocol(s): "
Expand Down

0 comments on commit 8a95412

Please sign in to comment.