diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index a1021289779..90464e400fb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -95,7 +95,6 @@ protected CompletableFuture executePeerTask(final Optional assignedP .addArgument(peerToUse) .addArgument(this::getRetryCount) .log(); - result.complete(peerResult); return peerResult; }); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java index 16040e963ae..1e770ac950e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java @@ -22,13 +22,13 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes; -public class RetryingGetNodeDataFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetNodeDataFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { private final EthContext ethContext; private final Set hashes; @@ -40,7 +40,7 @@ private RetryingGetNodeDataFromPeerTask( final Collection hashes, final long pivotBlockNumber, final MetricsSystem metricsSystem) { - super(ethContext, 4, data -> false, metricsSystem); + super(ethContext, metricsSystem, data -> false, 4); this.ethContext = ethContext; this.hashes = new HashSet<>(hashes); this.pivotBlockNumber = pivotBlockNumber; @@ -56,11 +56,10 @@ public static RetryingGetNodeDataFromPeerTask forHashes( } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetNodeDataFromPeerTask task = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, pivotBlockNumber, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(peer); return executeSubTask(task::run) .thenApply( peerResult -> { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java index 524851af5f6..0bc7b110b3f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -26,7 +26,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -47,7 +47,7 @@ * Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed * to create the blocks that correspond to the supplied headers. */ -public class CompleteBlocksTask extends AbstractRetryingPeerTask> { +public class CompleteBlocksTask extends AbstractRetryingSwitchingPeerTask> { private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksTask.class); private static final int MIN_SIZE_INCOMPLETE_LIST = 1; @@ -66,8 +66,8 @@ private CompleteBlocksTask( final List headers, final int maxRetries, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Collection::isEmpty, metricsSystem); - checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); + super(ethContext, metricsSystem, Collection::isEmpty, maxRetries); + checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list"); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; this.metricsSystem = metricsSystem; @@ -130,11 +130,11 @@ public static CompleteBlocksTask forHeaders( } @Override - protected CompletableFuture> executePeerTask(final Optional assignedPeer) { - return requestBodies(assignedPeer).thenCompose(this::processBodiesResult); + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { + return requestBodies(peer).thenCompose(this::processBodiesResult); } - private CompletableFuture> requestBodies(final Optional assignedPeer) { + private CompletableFuture> requestBodies(final EthPeer assignedPeer) { final List incompleteHeaders = incompleteHeaders(); if (incompleteHeaders.isEmpty()) { return completedFuture(emptyList()); @@ -148,7 +148,7 @@ private CompletableFuture> requestBodies(final Optional ass final GetBodiesFromPeerTask task = GetBodiesFromPeerTask.forHeaders( protocolSchedule, ethContext, incompleteHeaders, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(assignedPeer); return task.run().thenApply(PeerTaskResult::getResult); }); } @@ -157,8 +157,7 @@ private CompletableFuture> processBodiesResult(final List blo blocksResult.forEach((block) -> blocks.put(block.getHeader().getNumber(), block)); if (incompleteHeaders().isEmpty()) { - result.complete( - headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList())); + result.complete(headers.stream().map(h -> blocks.get(h.getNumber())).toList()); } return completedFuture(blocksResult); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 8ca376902e4..e56b95d6e9c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -27,7 +27,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy; @@ -54,7 +54,8 @@ * Retrieves a sequence of headers, sending out requests repeatedly until all headers are fulfilled. * Validates headers as they are received. */ -public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> { +public class DownloadHeaderSequenceTask + extends AbstractRetryingSwitchingPeerTask> { private static final Logger LOG = LoggerFactory.getLogger(DownloadHeaderSequenceTask.class); private static final int DEFAULT_RETRIES = 5; @@ -80,7 +81,7 @@ private DownloadHeaderSequenceTask( final int maxRetries, final ValidationPolicy validationPolicy, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Collection::isEmpty, metricsSystem); + super(ethContext, metricsSystem, Collection::isEmpty, maxRetries); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; @@ -135,12 +136,11 @@ public static DownloadHeaderSequenceTask endingAtHeader( } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { LOG.debug( "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber()); final CompletableFuture> task = - downloadHeaders(assignedPeer).thenCompose(this::processHeaders); + downloadHeaders(peer).thenCompose(this::processHeaders); return task.whenComplete( (r, t) -> { // We're done if we've filled all requested headers @@ -155,7 +155,7 @@ protected CompletableFuture> executePeerTask( } private CompletableFuture>> downloadHeaders( - final Optional assignedPeer) { + final EthPeer assignedPeer) { // Figure out parameters for our headers request final boolean partiallyFilled = lastFilledHeaderIndex < segmentLength; final BlockHeader referenceHeaderForNextRequest = @@ -174,7 +174,7 @@ private CompletableFuture>> downloadHeaders( referenceHeaderForNextRequest.getNumber(), count + 1, metricsSystem); - assignedPeer.ifPresent(headersTask::assignPeer); + headersTask.assignPeer(assignedPeer); return headersTask.run(); }); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java index 58c4d3a7afa..017bb31a73a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java @@ -24,14 +24,13 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -40,7 +39,7 @@ /** Given a set of headers, repeatedly requests the receipts for those blocks. */ public class GetReceiptsForHeadersTask - extends AbstractRetryingPeerTask>> { + extends AbstractRetryingSwitchingPeerTask>> { private static final Logger LOG = LoggerFactory.getLogger(GetReceiptsForHeadersTask.class); private static final int DEFAULT_RETRIES = 5; @@ -55,8 +54,8 @@ private GetReceiptsForHeadersTask( final List headers, final int maxRetries, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Map::isEmpty, metricsSystem); - checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); + super(ethContext, metricsSystem, Map::isEmpty, maxRetries); + checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list"); this.ethContext = ethContext; this.metricsSystem = metricsSystem; @@ -87,13 +86,13 @@ private void completeEmptyReceipts(final List headers) { } @Override - protected CompletableFuture>> executePeerTask( - final Optional assignedPeer) { - return requestReceipts(assignedPeer).thenCompose(this::processResponse); + protected CompletableFuture>> executeTaskOnCurrentPeer( + final EthPeer peer) { + return requestReceipts(peer).thenCompose(this::processResponse); } private CompletableFuture>> requestReceipts( - final Optional assignedPeer) { + final EthPeer assignedPeer) { final List incompleteHeaders = incompleteHeaders(); if (incompleteHeaders.isEmpty()) { return CompletableFuture.completedFuture(emptyMap()); @@ -106,7 +105,7 @@ private CompletableFuture>> requestRec () -> { final GetReceiptsFromPeerTask task = GetReceiptsFromPeerTask.forHeaders(ethContext, incompleteHeaders, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); + task.assignPeer(assignedPeer); return task.run().thenApply(PeerTaskResult::getResult); }); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java index e8049af736e..c12638205d3 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java @@ -82,6 +82,9 @@ protected CompleteBlocksTask createTask(final List requestedData) { @Test public void shouldCompleteWithoutPeersWhenAllBlocksAreEmpty() { + // just needs any peer added, as it is not used! + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + final BlockHeader header1 = new BlockHeaderTestFixture().number(1).buildHeader(); final BlockHeader header2 = new BlockHeaderTestFixture().number(2).buildHeader(); final BlockHeader header3 = new BlockHeaderTestFixture().number(3).buildHeader(); @@ -113,6 +116,9 @@ public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() { when(mockShanghaiSpec.getWithdrawalsProcessor()) .thenReturn(Optional.of(mockWithdrawalsProcessor)); + // just needs any peer added, as it is not used! + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + final Block block1 = new Block( header1, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java index 2b28e8a5c98..e8ed3ab5538 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java @@ -21,7 +21,8 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.TransactionReceipt; -import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; +import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingSwitchingPeerMessageTaskTest; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import java.util.ArrayList; @@ -33,7 +34,7 @@ import org.junit.jupiter.api.Test; public class GetReceiptsForHeadersTaskTest - extends RetryingMessageTaskTest>> { + extends RetryingSwitchingPeerMessageTaskTest>> { @Override protected Map> generateDataToBeRequested() { @@ -66,6 +67,9 @@ public void shouldBeCompleteWhenAllReceiptsAreEmpty() { final Map> expected = ImmutableMap.of(header1, emptyList(), header2, emptyList(), header3, emptyList()); + // just needs any peer added, as it is not used! + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + assertThat(createTask(expected).run()).isCompletedWithValue(expected); } }