Skip to content

Commit

Permalink
Attempt to fix CPU spikes issue (#4867)
Browse files Browse the repository at this point in the history
* Modify reattemptPendingPeerRequests implementation to not iterate on pending request. This should avoid a lot of unnecessary computation

Signed-off-by: Ameziane H <[email protected]>

* Fix failed unit tests.

Signed-off-by: Ameziane H <[email protected]>

* Fix failed unit tests

Iterate over pending requests only if there is a peer with available request capacity

Signed-off-by: Ameziane H <[email protected]>

* Fix stream implementation (use the same stream several times)

Signed-off-by: Ameziane H <[email protected]>

* Spotless

Signed-off-by: Ameziane H <[email protected]>

* Undo reducing method visibility as it is no longer used

Signed-off-by: Ameziane H <[email protected]>

* use one buffered for each peer and wait to have capacity before sending request

Signed-off-by: Karim TAAM <[email protected]>

* fix tests

Signed-off-by: Karim TAAM <[email protected]>

Signed-off-by: Ameziane H <[email protected]>
Signed-off-by: Karim TAAM <[email protected]>
Co-authored-by: Karim TAAM <[email protected]>
  • Loading branch information
ahamlat and matkt authored Jan 11, 2023
1 parent 2a30dfb commit 192dcd0
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public EthPeer(
peerValidators.forEach(peerValidator -> validationStatus.put(peerValidator, false));
fullyValidated.set(peerValidators.isEmpty());

this.requestManagers = new HashMap<>();
this.requestManagers = new ConcurrentHashMap<>();

initEthRequestManagers();
initSnapRequestManagers();
Expand Down Expand Up @@ -375,22 +375,23 @@ public boolean validateReceivedMessage(final EthMessage message, final String pr
* @param ethMessage the Eth message to dispatch
* @param protocolName Specific protocol name if needed
*/
void dispatch(final EthMessage ethMessage, final String protocolName) {
Optional<RequestManager> dispatch(final EthMessage ethMessage, final String protocolName) {
checkArgument(
ethMessage.getPeer().equals(this), "Mismatched Eth message sent to peer for dispatch");
final int messageCode = ethMessage.getData().getCode();
reputation.resetTimeoutCount(messageCode);

getRequestManager(protocolName, messageCode)
.ifPresentOrElse(
requestManager -> requestManager.dispatchResponse(ethMessage),
() -> {
LOG.trace(
"Message {} not expected has just been received for protocol {}, peer {} ",
messageCode,
protocolName,
this);
});
Optional<RequestManager> requestManager = getRequestManager(protocolName, messageCode);
requestManager.ifPresentOrElse(
localRequestManager -> localRequestManager.dispatchResponse(ethMessage),
() -> {
LOG.trace(
"Message {} not expected has just been received for protocol {}, peer {} ",
messageCode,
protocolName,
this);
});
return requestManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public PendingPeerRequest executePeerRequest(

public void dispatchMessage(
final EthPeer peer, final EthMessage ethMessage, final String protocolName) {
peer.dispatch(ethMessage, protocolName);
if (peer.hasAvailableRequestCapacity()) {
Optional<RequestManager> maybeRequestManager = peer.dispatch(ethMessage, protocolName);
if (maybeRequestManager.isPresent() && peer.hasAvailableRequestCapacity()) {
reattemptPendingPeerRequests();
}
}
Expand All @@ -173,8 +173,9 @@ public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) {
@VisibleForTesting
void reattemptPendingPeerRequests() {
synchronized (this) {
final List<EthPeer> peers = streamAvailablePeers().collect(Collectors.toList());
final Iterator<PendingPeerRequest> iterator = pendingRequests.iterator();
while (iterator.hasNext()) {
while (iterator.hasNext() && peers.stream().anyMatch(EthPeer::hasAvailableRequestCapacity)) {
final PendingPeerRequest request = iterator.next();
if (request.attemptExecution()) {
pendingRequests.remove(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;

import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Queues;
Expand All @@ -48,17 +49,20 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private final PeerTransactionTracker transactionTracker;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final ScheduledFuture<?> scheduledFuture;
private final EthPeer peer;
private final Queue<Hash> txAnnounces;
private final Counter alreadySeenTransactionsCounter;

public BufferedGetPooledTransactionsFromPeerFetcher(
final EthContext ethContext,
final ScheduledFuture<?> scheduledFuture,
final EthPeer peer,
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.scheduledFuture = scheduledFuture;
this.peer = peer;
this.transactionPool = transactionPool;
this.transactionTracker = transactionTracker;
Expand All @@ -75,6 +79,10 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
.labels(HASHES);
}

public ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
}

public void requestTransactions() {
List<Hash> txHashesAnnounced;
while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -110,14 +111,21 @@ private void processNewPooledTransactionHashesMessage(
scheduledTasks.computeIfAbsent(
peer,
ethPeer -> {
ethContext
.getScheduler()
.scheduleFutureTask(
new FetcherCreatorTask(peer),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());
final ScheduledFuture<?> scheduledFuture =
ethContext
.getScheduler()
.scheduleFutureTaskWithFixedDelay(
new FetcherCreatorTask(peer),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod(),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());

return new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, peer, transactionPool, transactionTracker, metricsSystem);
ethContext,
scheduledFuture,
peer,
transactionPool,
transactionTracker,
metricsSystem);
});

bufferedTask.addHashes(
Expand Down Expand Up @@ -145,9 +153,10 @@ public FetcherCreatorTask(final EthPeer peer) {
@Override
public void run() {
if (peer != null) {
final BufferedGetPooledTransactionsFromPeerFetcher fetcher = scheduledTasks.remove(peer);
if (!peer.isDisconnected()) {
fetcher.requestTransactions();
if (peer.isDisconnected()) {
scheduledTasks.remove(peer).getScheduledFuture().cancel(true);
} else if (peer.hasAvailableRequestCapacity()) {
scheduledTasks.get(peer).requestTransactions();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -38,6 +39,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -63,10 +65,10 @@ public void setup() {
metricsSystem = new StubMetricsSystem();
transactionTracker = new PeerTransactionTracker();
when(ethContext.getScheduler()).thenReturn(ethScheduler);

ScheduledFuture<?> mock = mock(ScheduledFuture.class);
fetcher =
new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, ethPeer, transactionPool, transactionTracker, metricsSystem);
ethContext, mock, ethPeer, transactionPool, transactionTracker, metricsSystem);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() {
}

@Test
public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() {
public void
shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAddedFromPeerForTheFirstTime() {

final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
Expand All @@ -183,7 +184,8 @@ public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() {
ofMinutes(1));

verify(ethScheduler, times(1))
.scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class));
.scheduleFutureTaskWithFixedDelay(
any(FetcherCreatorTask.class), any(Duration.class), any(Duration.class));
}

@Test
Expand All @@ -204,7 +206,8 @@ public void shouldNotScheduleGetPooledTransactionsTaskTwice() {
ofMinutes(1));

verify(ethScheduler, times(1))
.scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class));
.scheduleFutureTaskWithFixedDelay(
any(FetcherCreatorTask.class), any(Duration.class), any(Duration.class));
}

@Test
Expand Down

0 comments on commit 192dcd0

Please sign in to comment.