diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java index ff7e0c1d390..2bbbf82f076 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java @@ -20,7 +20,4 @@ public class NetworkConstants { public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128; public static final int NODE_ID_BITS = 256; - - // https://github.com/ethereum/consensus-specs/pull/3767 - public static final int MAX_CONCURRENT_REQUESTS = 2; } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index b63d8894097..e056f36c1d6 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -93,7 +93,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); - static final UInt64 MIN_WAIT_MILLIS = UInt64.ZERO; + static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); private final SettableLabelledGauge sizeGauge; diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java index 2d18705a41e..7927f27ec26 100644 --- a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java @@ -28,10 +28,6 @@ public class ThrottlingTaskQueue { private int inflightTaskCount = 0; - public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) { - return new ThrottlingTaskQueue(maximumConcurrentTasks); - } - public static ThrottlingTaskQueue create( final int maximumConcurrentTasks, final MetricsSystem metricsSystem, diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java index 88e57f491a7..afa52355653 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java @@ -41,6 +41,7 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; @@ -50,8 +51,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { private static final Logger LOG = LogManager.getLogger(); - private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10); - private final AsyncRunner asyncRunner; private final RecentChainData recentChainData; private final Eth2PeerFactory eth2PeerFactory; @@ -67,6 +66,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { private final int eth2RpcOutstandingPingThreshold; private final Duration eth2StatusUpdateInterval; + private final SpecConfig specConfig; Eth2PeerManager( final Spec spec, @@ -99,6 +99,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler { this.eth2RpcPingInterval = eth2RpcPingInterval; this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold; this.eth2StatusUpdateInterval = eth2StatusUpdateInterval; + this.specConfig = spec.getGenesisSpecConfig(); } public static Eth2PeerManager create( @@ -236,7 +237,7 @@ private void ensureStatusReceived(final Eth2Peer peer) { .ifExceptionGetsHereRaiseABug(); } }, - STATUS_RECEIVED_TIMEOUT) + Duration.ofSeconds(specConfig.getRespTimeout())) .finish( () -> {}, error -> { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index 75a4feed619..63ba993db5f 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -115,8 +115,8 @@ public static BeaconChainMethods create( final MetadataMessagesFactory metadataMessagesFactory, final RpcEncoding rpcEncoding) { return new BeaconChainMethods( - createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding), - createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding), + createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding), + createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding), createBeaconBlocksByRoot( spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding), createBeaconBlocksByRange( @@ -144,10 +144,11 @@ public static BeaconChainMethods create( rpcEncoding, recentChainData), createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding), - createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); + createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); } private static Eth2RpcMethod createStatus( + final Spec spec, final AsyncRunner asyncRunner, final StatusMessageFactory statusMessageFactory, final PeerLookup peerLookup, @@ -164,10 +165,12 @@ private static Eth2RpcMethod createStatus( true, contextCodec, statusHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); } private static Eth2RpcMethod createGoodBye( + final Spec spec, final AsyncRunner asyncRunner, final MetricsSystem metricsSystem, final PeerLookup peerLookup, @@ -184,7 +187,8 @@ private static Eth2RpcMethod createGoodBye( false, contextCodec, goodbyeHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); } private static Eth2RpcMethod @@ -217,7 +221,8 @@ private static Eth2RpcMethod createGoodBye( expectResponseToRequest, forkDigestContextCodec, beaconBlocksByRootHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method)); @@ -254,7 +259,8 @@ private static Eth2RpcMethod createGoodBye( expectResponseToRequest, forkDigestContextCodec, beaconBlocksByRangeHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method)); @@ -293,7 +299,8 @@ private static Eth2RpcMethod createGoodBye( true, forkDigestContextCodec, blobSidecarsByRootHandler, - peerLookup)); + peerLookup, + spec.getNetworkingConfig())); } private static Optional> @@ -329,7 +336,8 @@ private static Eth2RpcMethod createGoodBye( true, forkDigestContextCodec, blobSidecarsByRangeHandler, - peerLookup)); + peerLookup, + spec.getNetworkingConfig())); } private static Eth2RpcMethod createMetadata( @@ -361,7 +369,8 @@ private static Eth2RpcMethod createMetadata( expectResponse, phase0ContextCodec, messageHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) { final SszSchema altairMetadataSchema = @@ -383,7 +392,8 @@ private static Eth2RpcMethod createMetadata( expectResponse, altairContextCodec, messageHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); return VersionedEth2RpcMethod.create( rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method)); } else { @@ -392,6 +402,7 @@ private static Eth2RpcMethod createMetadata( } private static Eth2RpcMethod createPing( + final Spec spec, final AsyncRunner asyncRunner, final MetadataMessagesFactory metadataMessagesFactory, final PeerLookup peerLookup, @@ -408,7 +419,8 @@ private static Eth2RpcMethod createPing( true, contextCodec, statusHandler, - peerLookup); + peerLookup, + spec.getNetworkingConfig()); } public Collection> all() { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java index 2a1bce8e6f3..dc8dd90ed46 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java @@ -28,13 +28,13 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler; import tech.pegasys.teku.networking.p2p.rpc.RpcStream; import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class Eth2IncomingRequestHandler< TRequest extends RpcRequest & SszData, TResponse extends SszData> implements RpcRequestHandler { private static final Logger LOG = LogManager.getLogger(); - private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10); private final PeerLookup peerLookup; private final LocalMessageHandler localMessageHandler; @@ -45,6 +45,7 @@ public class Eth2IncomingRequestHandler< private final String protocolId; private final AsyncRunner asyncRunner; private final AtomicBoolean requestHandled = new AtomicBoolean(false); + private final Duration respTimeout; public Eth2IncomingRequestHandler( final String protocolId, @@ -52,13 +53,15 @@ public Eth2IncomingRequestHandler( final RpcRequestDecoder requestDecoder, final AsyncRunner asyncRunner, final PeerLookup peerLookup, - final LocalMessageHandler localMessageHandler) { + final LocalMessageHandler localMessageHandler, + final NetworkingSpecConfig networkingConfig) { this.protocolId = protocolId; this.asyncRunner = asyncRunner; this.peerLookup = peerLookup; this.localMessageHandler = localMessageHandler; this.responseEncoder = responseEncoder; this.requestDecoder = requestDecoder; + this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout()); } @Override @@ -118,14 +121,15 @@ private void handleRequest( } private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { + final Duration timeout = respTimeout; asyncRunner - .getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT) + .getDelayedFuture(timeout) .thenAccept( (__) -> { if (!requestHandled.get()) { LOG.debug( "Failed to receive incoming request data within {} sec for protocol {}. Close stream.", - RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(), + timeout.toSeconds(), protocolId); stream.closeAbruptly().ifExceptionGetsHereRaiseABug(); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index 320a5a3f943..b39e2f70ad2 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -22,9 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.List; -import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; @@ -37,6 +38,7 @@ import tech.pegasys.teku.networking.p2p.peer.NodeId; import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler; import tech.pegasys.teku.networking.p2p.rpc.RpcStream; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class Eth2OutgoingRequestHandler< @@ -54,15 +56,13 @@ enum State { private static final Logger LOG = LogManager.getLogger(); - @VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10); - @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30); - private final AsyncRunner asyncRunner; private final int maximumResponseChunks; private final Eth2RpcResponseHandler responseHandler; private final ResponseStream responseStream; private final AsyncRunner timeoutRunner; + private final AtomicBoolean hasReceivedInitialBytes = new AtomicBoolean(false); private final AtomicInteger currentChunkCount = new AtomicInteger(0); private final AtomicReference state; private final AtomicReference> responseProcessor = @@ -71,6 +71,8 @@ enum State { private final String protocolId; private final RpcResponseDecoder responseDecoder; private final boolean shouldReceiveResponse; + private final Duration ttbfTimeout; + private final Duration respTimeout; public Eth2OutgoingRequestHandler( final AsyncRunner asyncRunner, @@ -79,24 +81,29 @@ public Eth2OutgoingRequestHandler( final RpcResponseDecoder responseDecoder, final boolean shouldReceiveResponse, final TRequest request, - final Eth2RpcResponseHandler responseHandler) { + final Eth2RpcResponseHandler responseHandler, + final NetworkingSpecConfig networkingConfig) { this.asyncRunner = asyncRunner; this.timeoutRunner = timeoutRunner; this.maximumResponseChunks = request.getMaximumResponseChunks(); + this.responseHandler = responseHandler; responseStream = new ResponseStream<>(responseHandler); this.responseDecoder = responseDecoder; this.shouldReceiveResponse = shouldReceiveResponse; this.protocolId = protocolId; + this.ttbfTimeout = Duration.of(networkingConfig.getTtfbTimeout(), ChronoUnit.SECONDS); + this.respTimeout = Duration.of(networkingConfig.getRespTimeout(), ChronoUnit.SECONDS); this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED); } public void handleInitialPayloadSent(final RpcStream stream) { // Close the write side of the stream stream.closeWriteStream().ifExceptionGetsHereRaiseABug(); + if (shouldReceiveResponse) { - // Setup initial chunk timeout - ensureNextResponseChunkArrivesInTime(stream, currentChunkCount.get(), currentChunkCount); + // Start timer for first bytes + ensureFirstBytesArriveWithinTimeLimit(stream); } else { ensureReadCompleteArrivesInTime(stream); } @@ -116,6 +123,8 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data)); } + onFirstByteReceived(rpcStream); + List maybeResponses = responseDecoder.decodeNextResponses(data); final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size()); @@ -128,7 +137,7 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By } if (chunksReceived < maximumResponseChunks) { if (!maybeResponses.isEmpty()) { - ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount); + ensureNextResponseArrivesInTime(rpcStream, chunksReceived, currentChunkCount); } } else { if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) { @@ -147,14 +156,14 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By private AsyncResponseProcessor getResponseProcessor(final RpcStream rpcStream) { return responseProcessor.updateAndGet( - oldVal -> - Objects.requireNonNullElseGet( - oldVal, - () -> - new AsyncResponseProcessor<>( - asyncRunner, - responseStream, - throwable -> abortRequest(rpcStream, throwable)))); + oldVal -> { + if (oldVal == null) { + return new AsyncResponseProcessor<>( + asyncRunner, responseStream, throwable -> abortRequest(rpcStream, throwable)); + } else { + return oldVal; + } + }); } private String bufToString(final ByteBuf buf) { @@ -207,6 +216,13 @@ private boolean transferToState(final State toState, final Collection fro return false; } + private void onFirstByteReceived(final RpcStream rpcStream) { + if (hasReceivedInitialBytes.compareAndSet(false, true)) { + // Setup initial chunk timeout + ensureNextResponseArrivesInTime(rpcStream, currentChunkCount.get(), currentChunkCount); + } + } + private void completeRequest(final RpcStream rpcStream) { getResponseProcessor(rpcStream) .finishProcessing() @@ -250,35 +266,49 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina } } - private void ensureNextResponseChunkArrivesInTime( + private void ensureFirstBytesArriveWithinTimeLimit(final RpcStream stream) { + timeoutRunner + .getDelayedFuture(ttbfTimeout) + .thenAccept( + (__) -> { + if (!hasReceivedInitialBytes.get()) { + abortRequest( + stream, + new RpcTimeoutException("Timed out waiting for initial response", ttbfTimeout)); + } + }) + .ifExceptionGetsHereRaiseABug(); + } + + private void ensureNextResponseArrivesInTime( final RpcStream stream, final int previousResponseCount, final AtomicInteger currentResponseCount) { + final Duration timeout = respTimeout; timeoutRunner - .getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT) + .getDelayedFuture(timeout) .thenAccept( (__) -> { if (previousResponseCount == currentResponseCount.get()) { abortRequest( stream, new RpcTimeoutException( - "Timed out waiting for response chunk " + previousResponseCount, - RESPONSE_CHUNK_ARRIVAL_TIMEOUT)); + "Timed out waiting for response chunk " + previousResponseCount, timeout)); } }) .ifExceptionGetsHereRaiseABug(); } private void ensureReadCompleteArrivesInTime(final RpcStream stream) { + final Duration timeout = respTimeout; timeoutRunner - .getDelayedFuture(READ_COMPLETE_TIMEOUT) + .getDelayedFuture(timeout) .thenAccept( (__) -> { if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) { abortRequest( stream, - new RpcTimeoutException( - "Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT)); + new RpcTimeoutException("Timed out waiting for read channel close", timeout)); } }) .ifExceptionGetsHereRaiseABug(); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java new file mode 100644 index 00000000000..0fce1e93fd1 --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcTimeouts.java @@ -0,0 +1,40 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.core; + +import java.time.Duration; +import tech.pegasys.teku.networking.p2p.rpc.StreamTimeoutException; + +/** + * This class holds constants related to handling rpc request timeouts. See: + * https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#configuration + */ +public abstract class RpcTimeouts { + + // The maximum time to wait for first byte of request response (time-to-first-byte). + static final Duration TTFB_TIMEOUT = Duration.ofSeconds(5); + // The maximum time for complete response transfer. + public static final Duration RESP_TIMEOUT = Duration.ofSeconds(10); + + public static class RpcTimeoutException extends StreamTimeoutException { + + public RpcTimeoutException(final String message, final Duration timeout) { + super(generateMessage(message, timeout)); + } + + private static String generateMessage(final String message, final Duration timeout) { + return String.format("Rpc request timed out after %d sec: %s", timeout.toSeconds(), message); + } + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java index 1c25f683d43..fd3224e848d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/SingleProtocolEth2RpcMethod.java @@ -29,6 +29,7 @@ import tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseEncoder; import tech.pegasys.teku.networking.eth2.rpc.core.encodings.RpcEncoding; import tech.pegasys.teku.networking.eth2.rpc.core.encodings.context.RpcContextCodec; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; public class SingleProtocolEth2RpcMethod< @@ -44,6 +45,7 @@ public class SingleProtocolEth2RpcMethod< private final LocalMessageHandler localMessageHandler; private final PeerLookup peerLookup; + private final NetworkingSpecConfig networkingConfig; public SingleProtocolEth2RpcMethod( final AsyncRunner asyncRunner, @@ -54,7 +56,8 @@ public SingleProtocolEth2RpcMethod( final boolean expectResponseToRequest, final RpcContextCodec contextCodec, final LocalMessageHandler localMessageHandler, - final PeerLookup peerLookup) { + final PeerLookup peerLookup, + final NetworkingSpecConfig networkingConfig) { super(encoding, requestType, expectResponseToRequest); this.asyncRunner = asyncRunner; this.contextCodec = contextCodec; @@ -63,6 +66,7 @@ public SingleProtocolEth2RpcMethod( this.protocolVersion = protocolVersion; this.localMessageHandler = localMessageHandler; this.peerLookup = peerLookup; + this.networkingConfig = networkingConfig; } @Override @@ -87,7 +91,8 @@ public Eth2IncomingRequestHandler createIncomingRequestHand createRequestDecoder(), asyncRunner, peerLookup, - localMessageHandler); + localMessageHandler, + networkingConfig); } @Override @@ -102,7 +107,8 @@ public Eth2OutgoingRequestHandler createOutgoingRequestHand createResponseDecoder(), expectResponseToRequest, request, - responseHandler); + responseHandler, + networkingConfig); } @Override diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java index 5dd08fe79f5..ce83a72a55c 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandlerTest.java @@ -18,9 +18,9 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.READ_COMPLETE_TIMEOUT; -import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.RESPONSE_CHUNK_ARRIVAL_TIMEOUT; +import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -65,6 +65,10 @@ public class Eth2OutgoingRequestHandlerTest Eth2RpcResponseHandler.expectMultipleResponses(res -> responseListener.get().onResponse(res)); private final int maxChunks = 3; private final SafeFuture finishedProcessingFuture = responseHandler.getCompletedFuture(); + private final Duration ttbfTimeout = + Duration.ofSeconds(spec.getGenesisSpecConfig().getTtfbTimeout()); + private final Duration respTimeout = + Duration.ofSeconds(spec.getGenesisSpecConfig().getRespTimeout()); private RpcResponseEncoder responseEncoder; private List chunks; @@ -102,7 +106,8 @@ public void setup() { responseDecoder, method.shouldReceiveResponse(), request, - responseHandler); + responseHandler, + spec.getNetworkingConfig()); } @Override @@ -265,30 +270,63 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception { } @Test - public void disconnectsIfFirstChunkIsNotReceivedInTime() { + public void disconnectsIfInitialBytesAreNotReceivedInTime() { sendInitialPayload(); + verify(rpcStream).closeWriteStream(); + verify(rpcStream, never()).closeAbruptly(); + + // Run async tasks + timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis()); + timeoutRunner.executeDueActions(); + verify(rpcStream).closeAbruptly(); + } + + @Test + public void doesNotDisconnectIfInitialBytesAreReceivedInTime() throws Exception { + sendInitialPayload(); + verify(rpcStream).closeWriteStream(); + verify(rpcStream, never()).closeAbruptly(); + + // Deliver some bytes just in time + timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis() - 1); + timeoutRunner.executeDueActions(); + deliverInitialBytes(); + + // Go past the time the first bytes should have been received and check it doesn't timeout + timeProvider.advanceTimeByMillis(10); + timeoutRunner.executeDueActions(); + verify(rpcStream, never()).closeAbruptly(); + } + + @Test + public void disconnectsIfFirstChunkIsNotReceivedInTime() throws Exception { + sendInitialPayload(); + + deliverInitialBytes(); // Run timeouts - timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); + timeProvider.advanceTimeByMillis(respTimeout.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @Test - public void doNotDisconnectsIfFirstChunkReceivedInTime() { + public void doNotDisconnectsIfFirstChunkReceivedInTime() throws Exception { sendInitialPayload(); + // First byte is received just in time + timeProvider.advanceTimeByMillis(ttbfTimeout.toMillis() - 1); deliverChunk(0); // Go past the time the first chunk would have timed out but not enough to trigger timeout on // the second chunk and ensure the timeout never fires. - timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); timeoutRunner.executeDueActions(); verify(rpcStream, never()).closeAbruptly(); } @Test - public void disconnectsIfSecondChunkNotReceivedInTime() { + public void disconnectsIfSecondChunkNotReceivedInTime() throws Exception { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -297,13 +335,13 @@ public void disconnectsIfSecondChunkNotReceivedInTime() { assertThat(blocks.size()).isEqualTo(1); // Run timeouts - timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis()); + timeProvider.advanceTimeByMillis(respTimeout.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @Test - public void abortsWhenNoReadComplete() { + public void abortsWhenNoReadComplete() throws Exception { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -314,7 +352,7 @@ public void abortsWhenNoReadComplete() { asyncRequestRunner.executeQueuedActions(); // Run timeouts - timeProvider.advanceTimeByMillis(READ_COMPLETE_TIMEOUT.toMillis()); + timeProvider.advanceTimeByMillis(respTimeout.toMillis()); timeoutRunner.executeDueActions(); verify(rpcStream).closeAbruptly(); } @@ -335,7 +373,7 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() { } @Test - public void doNotDisconnectsIfSecondChunkReceivedInTime() { + public void doNotDisconnectsIfSecondChunkReceivedInTime() throws Exception { sendInitialPayload(); timeProvider.advanceTimeByMillis(100); @@ -344,14 +382,14 @@ public void doNotDisconnectsIfSecondChunkReceivedInTime() { assertThat(blocks.size()).isEqualTo(1); // Second chunk is received just in time - timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); timeoutRunner.executeDueActions(); deliverChunk(1); asyncRequestRunner.executeQueuedActions(); // Go past the time the second chunk would have timed out but not enough to trigger timeout on // the third chunk and ensure the timeout never fires. - timeProvider.advanceTimeByMillis(RESPONSE_CHUNK_ARRIVAL_TIMEOUT.toMillis() - 1); + timeProvider.advanceTimeByMillis(respTimeout.toMillis() - 1); timeoutRunner.executeDueActions(); verify(rpcStream, never()).closeAbruptly(); assertThat(blocks.size()).isEqualTo(2); @@ -401,6 +439,11 @@ private void sendInitialPayload() { reqHandler.handleInitialPayloadSent(rpcStream); } + private void deliverInitialBytes() throws IOException { + final Bytes firstByte = chunks.get(0).slice(0, 1); + deliverBytes(firstByte); + } + private Bytes chunkBytes(final int chunk) { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(chunk); return responseEncoder.encodeSuccessfulResponse(block); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java index 9f526c9c21b..30e54675f05 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/RpcDecoderTestBase.java @@ -78,7 +78,8 @@ public class RpcDecoderTestBase { false, contextEncoder, mock(LocalMessageHandler.class), - peerLookup); + peerLookup, + spec.getNetworkingConfig()); protected List> testByteBufSlices(final Bytes... bytes) { List> ret = Utils.generateTestSlices(bytes); diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java index 733f92be5d2..5b809f77d14 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java @@ -15,7 +15,6 @@ import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_OPEN_STREAMS_RATE_LIMIT; import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT; -import static tech.pegasys.teku.spec.constants.NetworkConstants.MAX_CONCURRENT_REQUESTS; import com.google.common.base.Preconditions; import identify.pb.IdentifyOuterClass; @@ -153,9 +152,7 @@ public P2PNetwork build() { } protected List> createRpcHandlers() { - return rpcMethods.stream() - .map(m -> new RpcHandler<>(asyncRunner, m, MAX_CONCURRENT_REQUESTS)) - .toList(); + return rpcMethods.stream().map(m -> new RpcHandler<>(asyncRunner, m)).toList(); } protected LibP2PGossipNetwork createGossipNetwork() { diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java index f5f85af3c22..9de93c7765b 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java @@ -38,7 +38,6 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.SafeFuture.Interruptor; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId; import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler.Controller; @@ -57,21 +56,17 @@ public class RpcHandler< TRequest, TRespHandler extends RpcResponseHandler> implements ProtocolBinding> { + private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final Logger LOG = LogManager.getLogger(); - private static final Duration STREAM_INITIALIZE_TIMEOUT = Duration.ofSeconds(5); - - private final AsyncRunner asyncRunner; private final RpcMethod rpcMethod; - private final ThrottlingTaskQueue concurrentRequestsQueue; + private final AsyncRunner asyncRunner; public RpcHandler( final AsyncRunner asyncRunner, - final RpcMethod rpcMethod, - final int maxConcurrentRequests) { + final RpcMethod rpcMethod) { this.asyncRunner = asyncRunner; this.rpcMethod = rpcMethod; - concurrentRequestsQueue = ThrottlingTaskQueue.create(maxConcurrentRequests); } public RpcMethod getRpcMethod() { @@ -80,12 +75,6 @@ public RpcMethod getRpcMethod() { public SafeFuture> sendRequest( final Connection connection, final TRequest request, final TRespHandler responseHandler) { - return concurrentRequestsQueue.queueTask( - () -> sendRequestInternal(connection, request, responseHandler)); - } - - public SafeFuture> sendRequestInternal( - final Connection connection, final TRequest request, final TRespHandler responseHandler) { final Bytes initialPayload; try { @@ -94,11 +83,11 @@ public SafeFuture> sendRequestInternal( return SafeFuture.failedFuture(e); } - final Interruptor closeInterruptor = + Interruptor closeInterruptor = SafeFuture.createInterruptor(connection.closeFuture(), PeerDisconnectedException::new); - final Interruptor timeoutInterruptor = + Interruptor timeoutInterruptor = SafeFuture.createInterruptor( - asyncRunner.getDelayedFuture(STREAM_INITIALIZE_TIMEOUT), + asyncRunner.getDelayedFuture(TIMEOUT), () -> new StreamTimeoutException( "Timed out waiting to initialize stream for protocol(s): " diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java index 4537560323e..844e144ffc0 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java @@ -30,7 +30,6 @@ import io.libp2p.core.mux.StreamMuxer.Session; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; import kotlin.Unit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,9 +52,8 @@ public class RpcHandlerTest { StubAsyncRunner asyncRunner = new StubAsyncRunner(); RpcMethod> rpcMethod = mock(RpcMethod.class); - int maxConcurrentRequests = 2; RpcHandler> rpcHandler = - new RpcHandler<>(asyncRunner, rpcMethod, maxConcurrentRequests); + new RpcHandler<>(asyncRunner, rpcMethod); Connection connection = mock(Connection.class); Session session = mock(Session.class); @@ -249,39 +247,6 @@ void sendRequest_interruptBeforeInitialPayloadWritten( verify(stream).close(); } - @Test - @SuppressWarnings("FutureReturnValueIgnored") - void requestIsThrottledIfQueueIsFull() { - // fill the queue - IntStream.range(0, maxConcurrentRequests) - .forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler)); - - final StreamPromise> streamPromise1 = - new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>()); - when(session.createStream((ProtocolBinding>) any())) - .thenReturn(streamPromise1); - final Stream stream1 = mock(Stream.class); - streamPromise1.getStream().complete(stream1); - streamPromise1.getController().complete(controller); - final CompletableFuture protocolIdFuture1 = new CompletableFuture<>(); - when(stream1.getProtocol()).thenReturn(protocolIdFuture1); - protocolIdFuture1.complete("test"); - - final SafeFuture> throttledResult = - rpcHandler.sendRequest(connection, request, responseHandler); - - assertThat(throttledResult).isNotDone(); - - // empty the queue - streamPromise.getStream().complete(stream); - streamPromise.getController().complete(controller); - stream.getProtocol().complete("test"); - writeFuture.complete(null); - - // throttled request should have completed now - assertThat(throttledResult).isCompleted(); - } - @SuppressWarnings("UnnecessaryAsync") private Class executeInterrupts( final boolean closeStream, final boolean exceedTimeout) {