From ad9e26f4c8598ab4c30e8ce50082bd7a3a9688b0 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 8 Jan 2025 22:33:09 +0200 Subject: [PATCH] fix assemble + rename --- .../teku/networking/p2p/libp2p/LibP2PPeer.java | 10 +++++----- ...rRpcHandler.java => ThrottlingRpcHandler.java} | 8 ++++---- ...lerTest.java => ThrottlingRpcHandlerTest.java} | 15 ++++++++------- 3 files changed, 17 insertions(+), 16 deletions(-) rename networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/{PeerRpcHandler.java => ThrottlingRpcHandler.java} (87%) rename networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/{PeerRpcHandlerTest.java => ThrottlingRpcHandlerTest.java} (85%) diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java index d5b8a250265..0dc0f5455c9 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java @@ -29,8 +29,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.networking.p2p.libp2p.rpc.PeerRpcHandler; import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler; +import tech.pegasys.teku.networking.p2p.libp2p.rpc.ThrottlingRpcHandler; import tech.pegasys.teku.networking.p2p.network.PeerAddress; import tech.pegasys.teku.networking.p2p.peer.DisconnectReason; import tech.pegasys.teku.networking.p2p.peer.DisconnectRequestHandler; @@ -47,7 +47,7 @@ public class LibP2PPeer implements Peer { private static final Logger LOG = LogManager.getLogger(); - private final Map, PeerRpcHandler> rpcHandlers; + private final Map, ThrottlingRpcHandler> rpcHandlers; private final ReputationManager reputationManager; private final Function peerScoreFunction; private final Connection connection; @@ -74,7 +74,7 @@ public LibP2PPeer( this.connection = connection; this.rpcHandlers = rpcHandlers.stream() - .collect(Collectors.toMap(RpcHandler::getRpcMethod, PeerRpcHandler::new)); + .collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new)); this.reputationManager = reputationManager; this.peerScoreFunction = peerScoreFunction; this.peerId = connection.secureSession().getRemoteId(); @@ -207,8 +207,8 @@ SafeFuture> sendRequest( final TRequest request, final RespHandler responseHandler) { @SuppressWarnings("unchecked") - final PeerRpcHandler rpcHandler = - (PeerRpcHandler) rpcHandlers.get(rpcMethod); + final ThrottlingRpcHandler rpcHandler = + (ThrottlingRpcHandler) rpcHandlers.get(rpcMethod); if (rpcHandler == null) { throw new IllegalArgumentException( "Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds())); diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandler.java similarity index 87% rename from networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandler.java rename to networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandler.java index 8ec54aedc86..4e4e5eb7674 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandler.java @@ -21,22 +21,22 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController; import tech.pegasys.teku.spec.constants.NetworkConstants; -public class PeerRpcHandler< +public class ThrottlingRpcHandler< TOutgoingHandler extends RpcRequestHandler, TRequest, TRespHandler extends RpcResponseHandler> { private final RpcHandler delegate; - private final ThrottlingTaskQueue outgoingQueue = + private final ThrottlingTaskQueue requestsQueue = ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS); - public PeerRpcHandler(final RpcHandler delegate) { + public ThrottlingRpcHandler(final RpcHandler delegate) { this.delegate = delegate; } public SafeFuture> sendRequest( final Connection connection, final TRequest request, final TRespHandler responseHandler) { - return outgoingQueue.queueTask( + return requestsQueue.queueTask( () -> delegate.sendRequest(connection, request, responseHandler)); } } diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandlerTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandlerTest.java similarity index 85% rename from networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandlerTest.java rename to networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandlerTest.java index 8c0386cae10..d1242b3d726 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandlerTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandlerTest.java @@ -29,7 +29,7 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController; import tech.pegasys.teku.spec.constants.NetworkConstants; -public class PeerRpcHandlerTest { +public class ThrottlingRpcHandlerTest { private final Connection connection = mock(Connection.class); @@ -41,15 +41,16 @@ public class PeerRpcHandlerTest { private final RpcMethod> rpcMethod = mock(RpcMethod.class); - private PeerRpcHandler> peerRpcHandler; + private ThrottlingRpcHandler> + throttlingRpcHandler; @BeforeEach public void init() { when(delegate.getRpcMethod()).thenReturn(rpcMethod); - peerRpcHandler = new PeerRpcHandler<>(delegate); + throttlingRpcHandler = new ThrottlingRpcHandler<>(delegate); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "FutureReturnValueIgnored"}) @Test public void sendRequest_throttlesRequests() { @@ -61,16 +62,16 @@ public void sendRequest_throttlesRequests() { final SafeFuture> future = new SafeFuture<>(); when(delegate.sendRequest(connection, null, null)).thenReturn(future); - peerRpcHandler.sendRequest(connection, null, null); + throttlingRpcHandler.sendRequest(connection, null, null); return future; }) .toList(); - when(peerRpcHandler.sendRequest(connection, null, null)) + when(throttlingRpcHandler.sendRequest(connection, null, null)) .thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class))); final SafeFuture> throttledRequest = - peerRpcHandler.sendRequest(connection, null, null); + throttlingRpcHandler.sendRequest(connection, null, null); // completed request should be throttled assertThat(throttledRequest).isNotDone();