Skip to content

Commit

Permalink
fix assemble + rename
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 8, 2025
1 parent c433948 commit ecb7356
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.PeerRpcHandler;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.ThrottlingRpcHandler;
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.networking.p2p.peer.DisconnectRequestHandler;
Expand All @@ -47,7 +47,7 @@
public class LibP2PPeer implements Peer {
private static final Logger LOG = LogManager.getLogger();

private final Map<RpcMethod<?, ?, ?>, PeerRpcHandler<?, ?, ?>> rpcHandlers;
private final Map<RpcMethod<?, ?, ?>, ThrottlingRpcHandler<?, ?, ?>> rpcHandlers;
private final ReputationManager reputationManager;
private final Function<PeerId, Double> peerScoreFunction;
private final Connection connection;
Expand All @@ -74,7 +74,7 @@ public LibP2PPeer(
this.connection = connection;
this.rpcHandlers =
rpcHandlers.stream()
.collect(Collectors.toMap(RpcHandler::getRpcMethod, PeerRpcHandler::new));
.collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new));
this.reputationManager = reputationManager;
this.peerScoreFunction = peerScoreFunction;
this.peerId = connection.secureSession().getRemoteId();
Expand Down Expand Up @@ -207,8 +207,8 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final TRequest request,
final RespHandler responseHandler) {
@SuppressWarnings("unchecked")
final PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
final ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
if (rpcHandler == null) {
throw new IllegalArgumentException(
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class PeerRpcHandler<
public class ThrottlingRpcHandler<
TOutgoingHandler extends RpcRequestHandler,
TRequest,
TRespHandler extends RpcResponseHandler<?>> {

private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;
private final ThrottlingTaskQueue outgoingQueue =
private final ThrottlingTaskQueue requestsQueue =
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);

public PeerRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
public ThrottlingRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
this.delegate = delegate;
}

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return outgoingQueue.queueTask(
return requestsQueue.queueTask(
() -> delegate.sendRequest(connection, request, responseHandler));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class PeerRpcHandlerTest {
public class ThrottlingRpcHandlerTest {

private final Connection connection = mock(Connection.class);

Expand All @@ -41,15 +41,16 @@ public class PeerRpcHandlerTest {
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
mock(RpcMethod.class);

private PeerRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> peerRpcHandler;
private ThrottlingRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>>
throttlingRpcHandler;

@BeforeEach
public void init() {
when(delegate.getRpcMethod()).thenReturn(rpcMethod);
peerRpcHandler = new PeerRpcHandler<>(delegate);
throttlingRpcHandler = new ThrottlingRpcHandler<>(delegate);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
@Test
public void sendRequest_throttlesRequests() {

Expand All @@ -61,16 +62,16 @@ public void sendRequest_throttlesRequests() {
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
new SafeFuture<>();
when(delegate.sendRequest(connection, null, null)).thenReturn(future);
peerRpcHandler.sendRequest(connection, null, null);
throttlingRpcHandler.sendRequest(connection, null, null);
return future;
})
.toList();

when(peerRpcHandler.sendRequest(connection, null, null))
when(throttlingRpcHandler.sendRequest(connection, null, null))
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
peerRpcHandler.sendRequest(connection, null, null);
throttlingRpcHandler.sendRequest(connection, null, null);

// completed request should be throttled
assertThat(throttledRequest).isNotDone();
Expand Down

0 comments on commit ecb7356

Please sign in to comment.