Skip to content

Commit

Permalink
Reintroduce timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Nov 23, 2024
1 parent 2a8ffa3 commit 4a95c1c
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public class Constants {
public static final int ATTESTATION_SUBNET_COUNT = 64;

// Teku Networking Specific
// General RPC timeout for receiving a full response after initiating a request
public static final Duration RPC_REQUEST_TIMEOUT = Duration.ofSeconds(10);
// Global RPC timeout for reading an incoming request or for receiving a response chunk after
// initiating a request
public static final Duration RPC_TIMEOUT = Duration.ofSeconds(10);
public static final int VALID_BLOCK_SET_SIZE = 1000;
// Target holding two slots worth of aggregators (16 aggregators, 64 committees and 2 slots)
public static final int VALID_AGGREGATE_SET_SIZE = 16 * 64 * 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

package tech.pegasys.teku.networking.eth2.rpc.core;

import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
Expand All @@ -28,7 +29,6 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStream;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;

public class Eth2IncomingRequestHandler<
Expand All @@ -45,23 +45,20 @@ public class Eth2IncomingRequestHandler<
private final String protocolId;
private final AsyncRunner asyncRunner;
private final AtomicBoolean requestHandled = new AtomicBoolean(false);
private final Duration respTimeout;

public Eth2IncomingRequestHandler(
final String protocolId,
final RpcResponseEncoder<TResponse, ?> responseEncoder,
final RpcRequestDecoder<TRequest> requestDecoder,
final AsyncRunner asyncRunner,
final PeerLookup peerLookup,
final LocalMessageHandler<TRequest, TResponse> localMessageHandler,
final NetworkingSpecConfig networkingConfig) {
final LocalMessageHandler<TRequest, TResponse> localMessageHandler) {
this.protocolId = protocolId;
this.asyncRunner = asyncRunner;
this.peerLookup = peerLookup;
this.localMessageHandler = localMessageHandler;
this.responseEncoder = responseEncoder;
this.requestDecoder = requestDecoder;
this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout());
}

@Override
Expand Down Expand Up @@ -121,15 +118,14 @@ private void handleRequest(
}

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
final Duration timeout = respTimeout;
asyncRunner
.getDelayedFuture(timeout)
.getDelayedFuture(RPC_TIMEOUT)
.thenAccept(
(__) -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
timeout.toSeconds(),
RPC_TIMEOUT.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.DATA_COMPLETED;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.EXPECT_DATA;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.READ_COMPLETE;
import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -53,37 +55,46 @@ enum State {
private static final Logger LOG = LogManager.getLogger();

private final AsyncRunner asyncRunner;
private final AsyncRunner timeoutRunner;
private final int maximumResponseChunks;
private final Eth2RpcResponseHandler<TResponse, ?> responseHandler;
private final ResponseStream<TResponse> responseStream;

private final AtomicBoolean hasReceivedInitialBytes = new AtomicBoolean(false);
private final AtomicInteger currentChunkCount = new AtomicInteger(0);
private final AtomicReference<State> state;
private final AtomicReference<AsyncResponseProcessor<TResponse>> responseProcessor =
new AtomicReference<>();

private final String protocolId;
private final RpcResponseDecoder<TResponse, ?> responseDecoder;
private final boolean shouldReceiveResponse;

public Eth2OutgoingRequestHandler(
final AsyncRunner asyncRunner,
final AsyncRunner timeoutRunner,
final String protocolId,
final RpcResponseDecoder<TResponse, ?> responseDecoder,
final boolean shouldReceiveResponse,
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
this.asyncRunner = asyncRunner;
this.timeoutRunner = timeoutRunner;
this.maximumResponseChunks = request.getMaximumResponseChunks();
this.responseHandler = responseHandler;
responseStream = new ResponseStream<>(responseHandler);
this.responseDecoder = responseDecoder;
this.shouldReceiveResponse = shouldReceiveResponse;
this.protocolId = protocolId;
this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED);
}

public void handleInitialPayloadSent(final RpcStream stream) {
// Close the write side of the stream
stream.closeWriteStream().ifExceptionGetsHereRaiseABug();
if (!shouldReceiveResponse) {
ensureReadCompleteArrivesInTime(stream);
}
}

@Override
Expand All @@ -100,6 +111,8 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data));
}

onFirstByteReceived(rpcStream);

List<TResponse> maybeResponses = responseDecoder.decodeNextResponses(data);
final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size());

Expand All @@ -110,9 +123,16 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
for (TResponse maybeResponse : maybeResponses) {
getResponseProcessor(rpcStream).processResponse(maybeResponse);
}
if (chunksReceived >= maximumResponseChunks
&& !transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
if (chunksReceived < maximumResponseChunks) {
if (!maybeResponses.isEmpty()) {
ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount);
}
} else {
if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
return;
}
ensureReadCompleteArrivesInTime(rpcStream);
}
} catch (final RpcException e) {
abortRequest(rpcStream, e);
Expand Down Expand Up @@ -184,6 +204,13 @@ private boolean transferToState(final State toState, final Collection<State> fro
return false;
}

private void onFirstByteReceived(final RpcStream rpcStream) {
if (hasReceivedInitialBytes.compareAndSet(false, true)) {
// Setup initial chunk timeout
ensureNextResponseChunkArrivesInTime(rpcStream, currentChunkCount.get(), currentChunkCount);
}
}

private void completeRequest(final RpcStream rpcStream) {
getResponseProcessor(rpcStream)
.finishProcessing()
Expand Down Expand Up @@ -227,6 +254,40 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina
}
}

private void ensureNextResponseChunkArrivesInTime(
final RpcStream stream,
final int previousResponseCount,
final AtomicInteger currentResponseCount) {
timeoutRunner
.getDelayedFuture(RPC_TIMEOUT)
.thenAccept(
(__) -> {
if (previousResponseCount == currentResponseCount.get()) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for response chunk " + previousResponseCount,
RPC_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

private void ensureReadCompleteArrivesInTime(final RpcStream stream) {
timeoutRunner
.getDelayedFuture(RPC_TIMEOUT)
.thenAccept(
(__) -> {
if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for read channel close", RPC_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

@VisibleForTesting
State getState() {
return state.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public Eth2IncomingRequestHandler<TRequest, TResponse> createIncomingRequestHand
createRequestDecoder(),
asyncRunner,
peerLookup,
localMessageHandler,
networkingConfig);
localMessageHandler);
}

@Override
Expand All @@ -101,6 +100,7 @@ public Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHand
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
return new Eth2OutgoingRequestHandler<>(
asyncRunner,
asyncRunner,
protocolId,
createResponseDecoder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -95,6 +96,7 @@ public void setup() {
getRpcEncoding(), spec.getGenesisSchemaDefinitions().getSignedBeaconBlockSchema());
return new Eth2OutgoingRequestHandler<>(
asyncRequestRunner,
timeoutRunner,
method.getIds().get(0),
responseDecoder,
method.shouldReceiveResponse(),
Expand Down Expand Up @@ -261,6 +263,33 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception {
verify(rpcStream, never()).closeAbruptly();
}

@Test
public void disconnectsIfFirstChunkIsNotReceivedInTime() {
sendInitialPayload();

deliverInitialBytes();

// Run timeouts
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void disconnectsIfSecondChunkNotReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Run timeouts
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
sendInitialPayload();
Expand All @@ -276,6 +305,29 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
assertThat(finishedProcessingFuture).isCompletedExceptionally();
}

@Test
public void doNotDisconnectsIfSecondChunkReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Second chunk is received just in time
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
deliverChunk(1);
asyncRequestRunner.executeQueuedActions();

// Go past the time the second chunk would have timed out but not enough to trigger timeout on
// the third chunk and ensure the timeout never fires.
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
verify(rpcStream, never()).closeAbruptly();
assertThat(blocks.size()).isEqualTo(2);
}

@Test
public void shouldWorkWhenInitialPayloadEventIsLate() throws Exception {
deliverChunk(0);
Expand Down Expand Up @@ -320,6 +372,11 @@ private void sendInitialPayload() {
reqHandler.handleInitialPayloadSent(rpcStream);
}

private void deliverInitialBytes() {
final Bytes firstByte = chunks.get(0).slice(0, 1);
deliverBytes(firstByte);
}

private Bytes chunkBytes(final int chunk) {
final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(chunk);
return responseEncoder.encodeSuccessfulResponse(block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,26 +256,26 @@ void requestIsThrottledIfQueueIsFull() {
IntStream.range(0, maxConcurrentRequests)
.forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler));

final StreamPromise<Controller<RpcRequestHandler>> streamPromise =
final StreamPromise<Controller<RpcRequestHandler>> streamPromise1 =
new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>());
when(session.createStream((ProtocolBinding<Controller<RpcRequestHandler>>) any()))
.thenReturn(streamPromise);
final Stream stream = mock(Stream.class);
streamPromise.getStream().complete(stream);
streamPromise.getController().complete(controller);
CompletableFuture<String> protocolIdFuture = new CompletableFuture<>();
when(stream.getProtocol()).thenReturn(protocolIdFuture);
protocolIdFuture.complete("test");
.thenReturn(streamPromise1);
final Stream stream1 = mock(Stream.class);
streamPromise1.getStream().complete(stream1);
streamPromise1.getController().complete(controller);
final CompletableFuture<String> protocolIdFuture1 = new CompletableFuture<>();
when(stream1.getProtocol()).thenReturn(protocolIdFuture1);
protocolIdFuture1.complete("test");

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledResult =
rpcHandler.sendRequest(connection, request, responseHandler);

assertThat(throttledResult).isNotDone();

// empty the queue
this.streamPromise.getStream().complete(stream);
this.streamPromise.getController().complete(controller);
this.stream.getProtocol().complete("test");
streamPromise.getStream().complete(stream);
streamPromise.getController().complete(controller);
stream.getProtocol().complete("test");
writeFuture.complete(null);

// throttled request should have completed now
Expand Down

0 comments on commit 4a95c1c

Please sign in to comment.