Skip to content

Commit

Permalink
convert snap tasks to switching
Browse files Browse the repository at this point in the history
  • Loading branch information
pinges committed Jun 30, 2024
1 parent 626af31 commit 4a4c7a2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {
extends AbstractRetryingSwitchingPeerTask<AccountRangeMessage.AccountRangeData> {

public static final int MAX_RETRIES = 4;

Expand All @@ -46,9 +45,9 @@ private RetryingGetAccountRangeFromPeerTask(
final MetricsSystem metricsSystem) {
super(
ethContext,
MAX_RETRIES,
metricsSystem,
data -> data.accounts().isEmpty() && data.proofs().isEmpty(),
metricsSystem);
MAX_RETRIES);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand All @@ -67,17 +66,19 @@ public static EthTask<AccountRangeMessage.AccountRangeData> forAccountRange(
}

@Override
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetAccountRangeFromPeerTask task =
GetAccountRangeFromPeerTask.forAccountRange(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
final AccountRangeMessage.AccountRangeData pr = peerResult.getResult();
// TODO: record a useless response, if this is empty?
result.complete(pr);
return pr;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes32, Bytes>> {
public class RetryingGetBytecodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes32, Bytes>> {

private final EthContext ethContext;
private final List<Bytes32> codeHashes;
Expand All @@ -41,7 +41,7 @@ private RetryingGetBytecodeFromPeerTask(
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, 4);
this.ethContext = ethContext;
this.codeHashes = codeHashes;
this.blockHeader = blockHeader;
Expand All @@ -57,16 +57,17 @@ public static EthTask<Map<Bytes32, Bytes>> forByteCode(
}

@Override
protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes32, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetBytecodeFromPeerTask task =
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
final Map<Bytes32, Bytes> pr = peerResult.getResult();
// TODO: report useless response
result.complete(pr);
return pr;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetStorageRangeFromPeerTask
extends AbstractRetryingPeerTask<StorageRangeMessage.SlotRangeData> {
extends AbstractRetryingSwitchingPeerTask<StorageRangeMessage.SlotRangeData> {

private final EthContext ethContext;
private final List<Bytes32> accountHashes;
Expand All @@ -45,7 +44,7 @@ private RetryingGetStorageRangeFromPeerTask(
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
super(ethContext, metricsSystem, data -> data.proofs().isEmpty() && data.slots().isEmpty(), 4);
this.ethContext = ethContext;
this.accountHashes = accountHashes;
this.startKeyHash = startKeyHash;
Expand All @@ -66,17 +65,19 @@ public static EthTask<StorageRangeMessage.SlotRangeData> forStorageRange(
}

@Override
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetStorageRangeFromPeerTask task =
GetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
final StorageRangeMessage.SlotRangeData pr = peerResult.getResult();
// TODO: report useless response
result.complete(pr);
return pr;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;

public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes, Bytes>> {
public class RetryingGetTrieNodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes, Bytes>> {

private final EthContext ethContext;
private final Map<Bytes, List<Bytes>> paths;
Expand All @@ -40,7 +40,7 @@ private RetryingGetTrieNodeFromPeerTask(
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, metricsSystem, Map::isEmpty, 4);
this.ethContext = ethContext;
this.paths = paths;
this.blockHeader = blockHeader;
Expand All @@ -56,16 +56,17 @@ public static EthTask<Map<Bytes, Bytes>> forTrieNodes(
}

@Override
protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetTrieNodeFromPeerTask task =
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
task.assignPeer(peer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
final Map<Bytes, Bytes> pr = peerResult.getResult();
// TODO: report useless response?
result.complete(pr);
return pr;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerRequestTask.class);
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(6);

private Duration timeout = DEFAULT_TIMEOUT;
private final int requestCode;
Expand Down

0 comments on commit 4a4c7a2

Please sign in to comment.