diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index f2d36ae6467..320a5a3f943 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -53,11 +54,15 @@ enum State { private static final Logger LOG = LogManager.getLogger(); + @VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10); + @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30); + private final AsyncRunner asyncRunner; private final int maximumResponseChunks; private final Eth2RpcResponseHandler responseHandler; private final ResponseStream responseStream; + private final AsyncRunner timeoutRunner; private final AtomicInteger currentChunkCount = new AtomicInteger(0); private final AtomicReference state; private final AtomicReference> responseProcessor = @@ -65,19 +70,23 @@ enum State { private final String protocolId; private final RpcResponseDecoder responseDecoder; + private final boolean shouldReceiveResponse; public Eth2OutgoingRequestHandler( final AsyncRunner asyncRunner, + final AsyncRunner timeoutRunner, final String protocolId, final RpcResponseDecoder responseDecoder, final boolean shouldReceiveResponse, final TRequest request, final Eth2RpcResponseHandler responseHandler) { this.asyncRunner = asyncRunner; + this.timeoutRunner = timeoutRunner; this.maximumResponseChunks = request.getMaximumResponseChunks(); this.responseHandler = responseHandler; responseStream = new ResponseStream<>(responseHandler); this.responseDecoder = responseDecoder; + this.shouldReceiveResponse = shouldReceiveResponse; this.protocolId = protocolId; this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED); } @@ -85,6 +94,12 @@ public Eth2OutgoingRequestHandler( public void handleInitialPayloadSent(final RpcStream stream) { // Close the write side of the stream stream.closeWriteStream().ifExceptionGetsHereRaiseABug(); + if (shouldReceiveResponse) { + // Setup initial chunk timeout + ensureNextResponseChunkArrivesInTime(stream, currentChunkCount.get(), currentChunkCount); + } else { + ensureReadCompleteArrivesInTime(stream); + } } @Override @@ -111,9 +126,16 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By for (TResponse maybeResponse : maybeResponses) { getResponseProcessor(rpcStream).processResponse(maybeResponse); } - if (chunksReceived >= maximumResponseChunks - && !transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) { - abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state)); + if (chunksReceived < maximumResponseChunks) { + if (!maybeResponses.isEmpty()) { + ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount); + } + } else { + if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) { + abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state)); + return; + } + ensureReadCompleteArrivesInTime(rpcStream); } } catch (final RpcException e) { abortRequest(rpcStream, e); @@ -228,6 +250,40 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina } } + private void ensureNextResponseChunkArrivesInTime( + final RpcStream stream, + final int previousResponseCount, + final AtomicInteger currentResponseCount) { + timeoutRunner + .getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT) + .thenAccept( + (__) -> { + if (previousResponseCount == currentResponseCount.get()) { + abortRequest( + stream, + new RpcTimeoutException( + "Timed out waiting for response chunk " + previousResponseCount, + RESPONSE_CHUNK_ARRIVAL_TIMEOUT)); + } + }) + .ifExceptionGetsHereRaiseABug(); + } + + private void ensureReadCompleteArrivesInTime(final RpcStream stream) { + timeoutRunner + .getDelayedFuture(READ_COMPLETE_TIMEOUT) + .thenAccept( + (__) -> { + if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) { + abortRequest( + stream, + new RpcTimeoutException( + "Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT)); + } + }) + .ifExceptionGetsHereRaiseABug(); + } + @VisibleForTesting State getState() { return state.get(); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java index fabbd95de46..1c25f683d43 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java @@ -96,6 +96,7 @@ public Eth2OutgoingRequestHandler createOutgoingRequestHand final TRequest request, final Eth2RpcResponseHandler responseHandler) { return new Eth2OutgoingRequestHandler<>( + asyncRunner, asyncRunner, protocolId, createResponseDecoder(), diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java index 9180942c32f..5dd08fe79f5 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java @@ -18,6 +18,8 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.READ_COMPLETE_TIMEOUT; +import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.RESPONSE_CHUNK_ARRIVAL_TIMEOUT; import java.util.ArrayList; import java.util.List; @@ -95,6 +97,7 @@ public void setup() { getRpcEncoding(), spec.getGenesisSchemaDefinitions().getSignedBeaconBlockSchema()); return new Eth2OutgoingRequestHandler<>( asyncRequestRunner, + timeoutRunner, method.getIds().get(0), responseDecoder, method.shouldReceiveResponse(), @@ -261,6 +264,61 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception { verify(rpcStream, never()).closeAbruptly(); } + @Test + public void disconnectsIfFirstChunkIsNotReceivedInTime() { + sendInitialPayload(); + + // Run timeouts + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); + timeoutRunner.executeDueActions(); + verify(rpcStream).closeAbruptly(); + } + + @Test + public void doNotDisconnectsIfFirstChunkReceivedInTime() { + sendInitialPayload(); + + deliverChunk(0); + + // Go past the time the first chunk would have timed out but not enough to trigger timeout on + // the second chunk and ensure the timeout never fires. + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeoutRunner.executeDueActions(); + verify(rpcStream, never()).closeAbruptly(); + } + + @Test + public void disconnectsIfSecondChunkNotReceivedInTime() { + sendInitialPayload(); + + timeProvider.advanceTimeByMillis(100); + deliverChunk(0); + asyncRequestRunner.executeQueuedActions(); + assertThat(blocks.size()).isEqualTo(1); + + // Run timeouts + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); + timeoutRunner.executeDueActions(); + verify(rpcStream).closeAbruptly(); + } + + @Test + public void abortsWhenNoReadComplete() { + sendInitialPayload(); + + timeProvider.advanceTimeByMillis(100); + for (int i = 0; i < maxChunks; i++) { + deliverChunk(i); + } + + asyncRequestRunner.executeQueuedActions(); + + // Run timeouts + timeProvider.advanceTimeByMillis(READ_COMPLETE_TIMEOUT.toMillis()); + timeoutRunner.executeDueActions(); + verify(rpcStream).closeAbruptly(); + } + @Test public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() { sendInitialPayload(); @@ -276,6 +334,29 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() { assertThat(finishedProcessingFuture).isCompletedExceptionally(); } + @Test + public void doNotDisconnectsIfSecondChunkReceivedInTime() { + sendInitialPayload(); + + timeProvider.advanceTimeByMillis(100); + deliverChunk(0); + asyncRequestRunner.executeQueuedActions(); + assertThat(blocks.size()).isEqualTo(1); + + // Second chunk is received just in time + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeoutRunner.executeDueActions(); + deliverChunk(1); + asyncRequestRunner.executeQueuedActions(); + + // Go past the time the second chunk would have timed out but not enough to trigger timeout on + // the third chunk and ensure the timeout never fires. + timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeoutRunner.executeDueActions(); + verify(rpcStream, never()).closeAbruptly(); + assertThat(blocks.size()).isEqualTo(2); + } + @Test public void shouldWorkWhenInitialPayloadEventIsLate() throws Exception { deliverChunk(0); diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java index 4acf69d73df..f5f85af3c22 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java @@ -57,9 +57,10 @@ public class RpcHandler< TRequest, TRespHandler extends RpcResponseHandler> implements ProtocolBinding> { - private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final Logger LOG = LogManager.getLogger(); + private static final Duration STREAM_INITIALIZE_TIMEOUT = Duration.ofSeconds(5); + private final AsyncRunner asyncRunner; private final RpcMethod rpcMethod; private final ThrottlingTaskQueue concurrentRequestsQueue; @@ -97,7 +98,7 @@ public SafeFuture> sendRequestInternal( SafeFuture.createInterruptor(connection.closeFuture(), PeerDisconnectedException::new); final Interruptor timeoutInterruptor = SafeFuture.createInterruptor( - asyncRunner.getDelayedFuture(TIMEOUT), + asyncRunner.getDelayedFuture(STREAM_INITIALIZE_TIMEOUT), () -> new StreamTimeoutException( "Timed out waiting to initialize stream for protocol(s): "