Skip to content

Commit

Permalink
make almost all retrying tasks switching
Browse files Browse the repository at this point in the history
  • Loading branch information
pinges committed Jul 2, 2024
1 parent 4a4c7a2 commit ce6c51c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedP
.addArgument(peerToUse)
.addArgument(this::getRetryCount)
.log();
result.complete(peerResult);
return peerResult;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;

public class RetryingGetNodeDataFromPeerTask extends AbstractRetryingPeerTask<Map<Hash, Bytes>> {
public class RetryingGetNodeDataFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Hash, Bytes>> {

private final EthContext ethContext;
private final Set<Hash> hashes;
Expand All @@ -40,7 +40,7 @@ private RetryingGetNodeDataFromPeerTask(
final Collection<Hash> hashes,
final long pivotBlockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> false, metricsSystem);
super(ethContext, metricsSystem, data -> false, 4);
this.ethContext = ethContext;
this.hashes = new HashSet<>(hashes);
this.pivotBlockNumber = pivotBlockNumber;
Expand All @@ -56,11 +56,10 @@ public static RetryingGetNodeDataFromPeerTask forHashes(
}

@Override
protected CompletableFuture<Map<Hash, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Hash, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetNodeDataFromPeerTask task =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, pivotBlockNumber, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -47,7 +47,7 @@
* Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed
* to create the blocks that correspond to the supplied headers.
*/
public class CompleteBlocksTask extends AbstractRetryingPeerTask<List<Block>> {
public class CompleteBlocksTask extends AbstractRetryingSwitchingPeerTask<List<Block>> {
private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksTask.class);

private static final int MIN_SIZE_INCOMPLETE_LIST = 1;
Expand All @@ -66,8 +66,8 @@ private CompleteBlocksTask(
final List<BlockHeader> headers,
final int maxRetries,
final MetricsSystem metricsSystem) {
super(ethContext, maxRetries, Collection::isEmpty, metricsSystem);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
super(ethContext, metricsSystem, Collection::isEmpty, maxRetries);
checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list");
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
Expand Down Expand Up @@ -130,11 +130,11 @@ public static CompleteBlocksTask forHeaders(
}

@Override
protected CompletableFuture<List<Block>> executePeerTask(final Optional<EthPeer> assignedPeer) {
return requestBodies(assignedPeer).thenCompose(this::processBodiesResult);
protected CompletableFuture<List<Block>> executeTaskOnCurrentPeer(final EthPeer peer) {
return requestBodies(peer).thenCompose(this::processBodiesResult);
}

private CompletableFuture<List<Block>> requestBodies(final Optional<EthPeer> assignedPeer) {
private CompletableFuture<List<Block>> requestBodies(final EthPeer assignedPeer) {
final List<BlockHeader> incompleteHeaders = incompleteHeaders();
if (incompleteHeaders.isEmpty()) {
return completedFuture(emptyList());
Expand All @@ -148,7 +148,7 @@ private CompletableFuture<List<Block>> requestBodies(final Optional<EthPeer> ass
final GetBodiesFromPeerTask task =
GetBodiesFromPeerTask.forHeaders(
protocolSchedule, ethContext, incompleteHeaders, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(assignedPeer);
return task.run().thenApply(PeerTaskResult::getResult);
});
}
Expand All @@ -157,8 +157,7 @@ private CompletableFuture<List<Block>> processBodiesResult(final List<Block> blo
blocksResult.forEach((block) -> blocks.put(block.getHeader().getNumber(), block));

if (incompleteHeaders().isEmpty()) {
result.complete(
headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList()));
result.complete(headers.stream().map(h -> blocks.get(h.getNumber())).toList());
}

return completedFuture(blocksResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy;
Expand All @@ -54,7 +54,8 @@
* Retrieves a sequence of headers, sending out requests repeatedly until all headers are fulfilled.
* Validates headers as they are received.
*/
public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask<List<BlockHeader>> {
public class DownloadHeaderSequenceTask
extends AbstractRetryingSwitchingPeerTask<List<BlockHeader>> {
private static final Logger LOG = LoggerFactory.getLogger(DownloadHeaderSequenceTask.class);
private static final int DEFAULT_RETRIES = 5;

Expand All @@ -80,7 +81,7 @@ private DownloadHeaderSequenceTask(
final int maxRetries,
final ValidationPolicy validationPolicy,
final MetricsSystem metricsSystem) {
super(ethContext, maxRetries, Collection::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Collection::isEmpty, maxRetries);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
Expand Down Expand Up @@ -135,12 +136,11 @@ public static DownloadHeaderSequenceTask endingAtHeader(
}

@Override
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<List<BlockHeader>> executeTaskOnCurrentPeer(final EthPeer peer) {
LOG.debug(
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber());
final CompletableFuture<List<BlockHeader>> task =
downloadHeaders(assignedPeer).thenCompose(this::processHeaders);
downloadHeaders(peer).thenCompose(this::processHeaders);
return task.whenComplete(
(r, t) -> {
// We're done if we've filled all requested headers
Expand All @@ -155,7 +155,7 @@ protected CompletableFuture<List<BlockHeader>> executePeerTask(
}

private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeaders(
final Optional<EthPeer> assignedPeer) {
final EthPeer assignedPeer) {
// Figure out parameters for our headers request
final boolean partiallyFilled = lastFilledHeaderIndex < segmentLength;
final BlockHeader referenceHeaderForNextRequest =
Expand All @@ -174,7 +174,7 @@ private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeaders(
referenceHeaderForNextRequest.getNumber(),
count + 1,
metricsSystem);
assignedPeer.ifPresent(headersTask::assignPeer);
headersTask.assignPeer(assignedPeer);
return headersTask.run();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand All @@ -40,7 +39,7 @@

/** Given a set of headers, repeatedly requests the receipts for those blocks. */
public class GetReceiptsForHeadersTask
extends AbstractRetryingPeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
extends AbstractRetryingSwitchingPeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
private static final Logger LOG = LoggerFactory.getLogger(GetReceiptsForHeadersTask.class);
private static final int DEFAULT_RETRIES = 5;

Expand All @@ -55,8 +54,8 @@ private GetReceiptsForHeadersTask(
final List<BlockHeader> headers,
final int maxRetries,
final MetricsSystem metricsSystem) {
super(ethContext, maxRetries, Map::isEmpty, metricsSystem);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
super(ethContext, metricsSystem, Map::isEmpty, maxRetries);
checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list");
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;

Expand Down Expand Up @@ -87,13 +86,13 @@ private void completeEmptyReceipts(final List<BlockHeader> headers) {
}

@Override
protected CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
return requestReceipts(assignedPeer).thenCompose(this::processResponse);
protected CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> executeTaskOnCurrentPeer(
final EthPeer peer) {
return requestReceipts(peer).thenCompose(this::processResponse);
}

private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> requestReceipts(
final Optional<EthPeer> assignedPeer) {
final EthPeer assignedPeer) {
final List<BlockHeader> incompleteHeaders = incompleteHeaders();
if (incompleteHeaders.isEmpty()) {
return CompletableFuture.completedFuture(emptyMap());
Expand All @@ -106,7 +105,7 @@ private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> requestRec
() -> {
final GetReceiptsFromPeerTask task =
GetReceiptsFromPeerTask.forHeaders(ethContext, incompleteHeaders, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(assignedPeer);
return task.run().thenApply(PeerTaskResult::getResult);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ protected CompleteBlocksTask createTask(final List<Block> requestedData) {

@Test
public void shouldCompleteWithoutPeersWhenAllBlocksAreEmpty() {
// just needs any peer added, as it is not used!
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();

final BlockHeader header1 = new BlockHeaderTestFixture().number(1).buildHeader();
final BlockHeader header2 = new BlockHeaderTestFixture().number(2).buildHeader();
final BlockHeader header3 = new BlockHeaderTestFixture().number(3).buildHeader();
Expand Down Expand Up @@ -113,6 +116,9 @@ public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() {
when(mockShanghaiSpec.getWithdrawalsProcessor())
.thenReturn(Optional.of(mockWithdrawalsProcessor));

// just needs any peer added, as it is not used!
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();

final Block block1 =
new Block(
header1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingSwitchingPeerMessageTaskTest;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;

import java.util.ArrayList;
Expand All @@ -33,7 +34,7 @@
import org.junit.jupiter.api.Test;

public class GetReceiptsForHeadersTaskTest
extends RetryingMessageTaskTest<Map<BlockHeader, List<TransactionReceipt>>> {
extends RetryingSwitchingPeerMessageTaskTest<Map<BlockHeader, List<TransactionReceipt>>> {

@Override
protected Map<BlockHeader, List<TransactionReceipt>> generateDataToBeRequested() {
Expand Down Expand Up @@ -66,6 +67,9 @@ public void shouldBeCompleteWhenAllReceiptsAreEmpty() {
final Map<BlockHeader, List<TransactionReceipt>> expected =
ImmutableMap.of(header1, emptyList(), header2, emptyList(), header3, emptyList());

// just needs any peer added, as it is not used!
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();

assertThat(createTask(expected).run()).isCompletedWithValue(expected);
}
}

0 comments on commit ce6c51c

Please sign in to comment.