diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 702ca6076b0..bbdecfb0178 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -136,7 +136,7 @@ public EthPeer( peerValidators.forEach(peerValidator -> validationStatus.put(peerValidator, false)); fullyValidated.set(peerValidators.isEmpty()); - this.requestManagers = new HashMap<>(); + this.requestManagers = new ConcurrentHashMap<>(); initEthRequestManagers(); initSnapRequestManagers(); @@ -375,22 +375,23 @@ public boolean validateReceivedMessage(final EthMessage message, final String pr * @param ethMessage the Eth message to dispatch * @param protocolName Specific protocol name if needed */ - void dispatch(final EthMessage ethMessage, final String protocolName) { + Optional dispatch(final EthMessage ethMessage, final String protocolName) { checkArgument( ethMessage.getPeer().equals(this), "Mismatched Eth message sent to peer for dispatch"); final int messageCode = ethMessage.getData().getCode(); reputation.resetTimeoutCount(messageCode); - getRequestManager(protocolName, messageCode) - .ifPresentOrElse( - requestManager -> requestManager.dispatchResponse(ethMessage), - () -> { - LOG.trace( - "Message {} not expected has just been received for protocol {}, peer {} ", - messageCode, - protocolName, - this); - }); + Optional requestManager = getRequestManager(protocolName, messageCode); + requestManager.ifPresentOrElse( + localRequestManager -> localRequestManager.dispatchResponse(ethMessage), + () -> { + LOG.trace( + "Message {} not expected has just been received for protocol {}, peer {} ", + messageCode, + protocolName, + this); + }); + return requestManager; } /** diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 9206ba6af7b..5ae869830ec 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -160,8 +160,8 @@ public PendingPeerRequest executePeerRequest( public void dispatchMessage( final EthPeer peer, final EthMessage ethMessage, final String protocolName) { - peer.dispatch(ethMessage, protocolName); - if (peer.hasAvailableRequestCapacity()) { + Optional maybeRequestManager = peer.dispatch(ethMessage, protocolName); + if (maybeRequestManager.isPresent() && peer.hasAvailableRequestCapacity()) { reattemptPendingPeerRequests(); } } @@ -173,8 +173,9 @@ public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) { @VisibleForTesting void reattemptPendingPeerRequests() { synchronized (this) { + final List peers = streamAvailablePeers().collect(Collectors.toList()); final Iterator iterator = pendingRequests.iterator(); - while (iterator.hasNext()) { + while (iterator.hasNext() && peers.stream().anyMatch(EthPeer::hasAvailableRequestCapacity)) { final PendingPeerRequest request = iterator.next(); if (request.attemptExecution()) { pendingRequests.remove(request); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java index 85dc979229d..ea9265d26c6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.List; import java.util.Queue; +import java.util.concurrent.ScheduledFuture; import com.google.common.collect.EvictingQueue; import com.google.common.collect.Queues; @@ -48,17 +49,20 @@ public class BufferedGetPooledTransactionsFromPeerFetcher { private final PeerTransactionTracker transactionTracker; private final EthContext ethContext; private final MetricsSystem metricsSystem; + private final ScheduledFuture scheduledFuture; private final EthPeer peer; private final Queue txAnnounces; private final Counter alreadySeenTransactionsCounter; public BufferedGetPooledTransactionsFromPeerFetcher( final EthContext ethContext, + final ScheduledFuture scheduledFuture, final EthPeer peer, final TransactionPool transactionPool, final PeerTransactionTracker transactionTracker, final MetricsSystem metricsSystem) { this.ethContext = ethContext; + this.scheduledFuture = scheduledFuture; this.peer = peer; this.transactionPool = transactionPool; this.transactionTracker = transactionTracker; @@ -75,6 +79,10 @@ public BufferedGetPooledTransactionsFromPeerFetcher( .labels(HASHES); } + public ScheduledFuture getScheduledFuture() { + return scheduledFuture; + } + public void requestTransactions() { List txHashesAnnounced; while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java index 7cae1de41cb..c05a81ce351 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java @@ -33,6 +33,7 @@ import java.time.Instant; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -110,14 +111,21 @@ private void processNewPooledTransactionHashesMessage( scheduledTasks.computeIfAbsent( peer, ethPeer -> { - ethContext - .getScheduler() - .scheduleFutureTask( - new FetcherCreatorTask(peer), - transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()); + final ScheduledFuture scheduledFuture = + ethContext + .getScheduler() + .scheduleFutureTaskWithFixedDelay( + new FetcherCreatorTask(peer), + transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod(), + transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()); return new BufferedGetPooledTransactionsFromPeerFetcher( - ethContext, peer, transactionPool, transactionTracker, metricsSystem); + ethContext, + scheduledFuture, + peer, + transactionPool, + transactionTracker, + metricsSystem); }); bufferedTask.addHashes( @@ -145,9 +153,10 @@ public FetcherCreatorTask(final EthPeer peer) { @Override public void run() { if (peer != null) { - final BufferedGetPooledTransactionsFromPeerFetcher fetcher = scheduledTasks.remove(peer); - if (!peer.isDisconnected()) { - fetcher.requestTransactions(); + if (peer.isDisconnected()) { + scheduledTasks.remove(peer).getScheduledFuture().cancel(true); + } else if (peer.hasAvailableRequestCapacity()) { + scheduledTasks.get(peer).requestTransactions(); } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java index d02d4f0d6e0..c9df3cc782c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,6 +39,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import io.netty.util.concurrent.ScheduledFuture; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -63,10 +65,10 @@ public void setup() { metricsSystem = new StubMetricsSystem(); transactionTracker = new PeerTransactionTracker(); when(ethContext.getScheduler()).thenReturn(ethScheduler); - + ScheduledFuture mock = mock(ScheduledFuture.class); fetcher = new BufferedGetPooledTransactionsFromPeerFetcher( - ethContext, ethPeer, transactionPool, transactionTracker, metricsSystem); + ethContext, mock, ethPeer, transactionPool, transactionTracker, metricsSystem); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java index 24217984e38..dfbf1d4deea 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java @@ -170,7 +170,8 @@ public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() { } @Test - public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() { + public void + shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAddedFromPeerForTheFirstTime() { final EthScheduler ethScheduler = mock(EthScheduler.class); when(ethContext.getScheduler()).thenReturn(ethScheduler); @@ -183,7 +184,8 @@ public void shouldScheduleGetPooledTransactionsTaskWhenNewTransactionAdded() { ofMinutes(1)); verify(ethScheduler, times(1)) - .scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class)); + .scheduleFutureTaskWithFixedDelay( + any(FetcherCreatorTask.class), any(Duration.class), any(Duration.class)); } @Test @@ -204,7 +206,8 @@ public void shouldNotScheduleGetPooledTransactionsTaskTwice() { ofMinutes(1)); verify(ethScheduler, times(1)) - .scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class)); + .scheduleFutureTaskWithFixedDelay( + any(FetcherCreatorTask.class), any(Duration.class), any(Duration.class)); } @Test