Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use retry switching peer for world state download tasks #5508

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8b76085
Use AbstractRetryingSwitchingPeerTask whenever it makes sense
fab-10 May 25, 2023
784b08f
Introduce a PoS friendly fast sync target manager
fab-10 May 25, 2023
589f89f
Require that best peers are fully validated
fab-10 May 25, 2023
a924542
Move PoS sync targe in another branch
fab-10 May 29, 2023
1378afc
Record empty responses when retrying a peer task
fab-10 May 29, 2023
a20c5ed
Review and consolidate the way result is set in retrying tasks
fab-10 May 29, 2023
bd71628
Merge branch 'demote-peer-with-empty-responses-when-retrying' into us…
fab-10 May 29, 2023
82051cb
Merge branch 'main' into demote-peer-with-empty-responses-when-retrying
fab-10 May 30, 2023
a80a73f
Merge branch 'demote-peer-with-empty-responses-when-retrying' into us…
fab-10 May 30, 2023
c695c32
Fix: always use the root cause to check the errors
fab-10 May 30, 2023
69b94d5
Merge branch 'demote-peer-with-empty-responses-when-retrying' into us…
fab-10 May 30, 2023
ed2def6
Adapt unit tests
fab-10 May 30, 2023
ea4a469
Merge branch 'main' into demote-peer-with-empty-responses-when-retrying
fab-10 Jun 1, 2023
1929945
Merge branch 'demote-peer-with-empty-responses-when-retrying' into us…
fab-10 Jun 1, 2023
d590fbf
merge main
pinges Dec 11, 2023
419e9be
fix annotation
pinges Dec 11, 2023
0dde76d
Merge branch 'main' of github.com:hyperledger/besu into use-retry-swi…
pinges Dec 12, 2023
5ee3d48
fix peer lookup to include unfinished connections
pinges Dec 14, 2023
ec897ab
Merge branch 'main' of github.com:hyperledger/besu into use-retry-swi…
pinges Dec 14, 2023
b62c7a2
check can exceed in all cases
pinges Dec 14, 2023
a337253
add debug log for disconnecting an established peer
pinges Dec 14, 2023
bb7cbd1
fix compile
pinges Dec 14, 2023
cbca445
fix unit test
pinges Dec 14, 2023
24a96a5
print message when not expected
pinges Dec 15, 2023
c26fea6
Merge branch 'main' of github.com:hyperledger/besu into use-retry-swi…
pinges Dec 15, 2023
addcfcd
really print the data
pinges Dec 15, 2023
93b772a
do not report useless for got storage range
pinges Dec 18, 2023
b9265e2
two more tasks should not report useless responses
pinges Dec 19, 2023
2e2e48f
one more task
pinges Dec 19, 2023
e98213c
add java doc to new method
pinges Dec 19, 2023
2b05f8b
check whether an empty response needs to be reported
pinges Jan 16, 2024
c22e28d
merge main
pinges Jan 16, 2024
b2ab6d7
make sure peers support snap
pinges Jan 17, 2024
3209a37
make sure peers support snap
pinges Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ public Stream<EthPeer> streamAvailablePeers() {
}

public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers().sorted(getBestChainComparator().reversed());
return streamAvailablePeers()
.filter(EthPeer::isFullyValidated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required?

.sorted(getBestChainComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {
extends AbstractRetryingSwitchingPeerTask<AccountRangeMessage.AccountRangeData> {

private final EthContext ethContext;
private final Bytes32 startKeyHash;
Expand All @@ -41,9 +41,9 @@ private RetryingGetAccountRangeFromPeerTask(
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(
ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
super(ethContext, metricsSystem, maxRetries);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand All @@ -56,23 +56,29 @@ public static EthTask<AccountRangeMessage.AccountRangeData> forAccountRange(
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final int maxRetries) {
return new RetryingGetAccountRangeFromPeerTask(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem, maxRetries);
}

@Override
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<AccountRangeMessage.AccountRangeData> executeTaskOnCurrentPeer(
final EthPeer assignedPeer) {
final GetAccountRangeFromPeerTask task =
GetAccountRangeFromPeerTask.forAccountRange(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
task.assignPeer(assignedPeer);
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final AccountRangeMessage.AccountRangeData data) {
return data.accounts().isEmpty() && data.proofs().isEmpty();
}

@Override
protected boolean successfulResult(final AccountRangeMessage.AccountRangeData peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes32, Bytes>> {
public class RetryingGetBytecodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes32, Bytes>> {

private final EthContext ethContext;
private final List<Bytes32> codeHashes;
Expand All @@ -40,8 +41,9 @@ private RetryingGetBytecodeFromPeerTask(
final EthContext ethContext,
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
super(ethContext, metricsSystem, maxRetries);
this.ethContext = ethContext;
this.codeHashes = codeHashes;
this.blockHeader = blockHeader;
Expand All @@ -52,21 +54,27 @@ public static EthTask<Map<Bytes32, Bytes>> forByteCode(
final EthContext ethContext,
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
return new RetryingGetBytecodeFromPeerTask(ethContext, codeHashes, blockHeader, metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
return new RetryingGetBytecodeFromPeerTask(
ethContext, codeHashes, blockHeader, metricsSystem, maxRetries);
}

@Override
protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes32, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetBytecodeFromPeerTask task =
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
task.assignPeer(peer);
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final Map<Bytes32, Bytes> peerResult) {
return peerResult.isEmpty();
}

@Override
protected boolean successfulResult(final Map<Bytes32, Bytes> peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes32;

public class RetryingGetStorageRangeFromPeerTask
extends AbstractRetryingPeerTask<StorageRangeMessage.SlotRangeData> {
extends AbstractRetryingSwitchingPeerTask<StorageRangeMessage.SlotRangeData> {

private final EthContext ethContext;
private final List<Bytes32> accountHashes;
Expand All @@ -44,8 +44,9 @@ private RetryingGetStorageRangeFromPeerTask(
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
super(ethContext, metricsSystem, maxRetries);
this.ethContext = ethContext;
this.accountHashes = accountHashes;
this.startKeyHash = startKeyHash;
Expand All @@ -60,23 +61,35 @@ public static EthTask<StorageRangeMessage.SlotRangeData> forStorageRange(
final Bytes32 startKeyHash,
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final int maxRetries) {
return new RetryingGetStorageRangeFromPeerTask(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
ethContext,
accountHashes,
startKeyHash,
endKeyHash,
blockHeader,
metricsSystem,
maxRetries);
}

@Override
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<StorageRangeMessage.SlotRangeData> executeTaskOnCurrentPeer(
final EthPeer peer) {
final GetStorageRangeFromPeerTask task =
GetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
task.assignPeer(peer);
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final StorageRangeMessage.SlotRangeData peerResult) {
return peerResult.proofs().isEmpty() && peerResult.slots().isEmpty();
}

@Override
protected boolean successfulResult(final StorageRangeMessage.SlotRangeData peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.tuweni.bytes.Bytes;

public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask<Map<Bytes, Bytes>> {
public class RetryingGetTrieNodeFromPeerTask
extends AbstractRetryingSwitchingPeerTask<Map<Bytes, Bytes>> {

private final EthContext ethContext;
private final Map<Bytes, List<Bytes>> paths;
Expand All @@ -39,8 +40,9 @@ private RetryingGetTrieNodeFromPeerTask(
final EthContext ethContext,
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
super(ethContext, metricsSystem, maxRetries);
this.ethContext = ethContext;
this.paths = paths;
this.blockHeader = blockHeader;
Expand All @@ -51,21 +53,27 @@ public static EthTask<Map<Bytes, Bytes>> forTrieNodes(
final EthContext ethContext,
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
return new RetryingGetTrieNodeFromPeerTask(ethContext, paths, blockHeader, metricsSystem);
final MetricsSystem metricsSystem,
final int maxRetries) {
return new RetryingGetTrieNodeFromPeerTask(
ethContext, paths, blockHeader, metricsSystem, maxRetries);
}

@Override
protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
protected CompletableFuture<Map<Bytes, Bytes>> executeTaskOnCurrentPeer(final EthPeer peer) {
final GetTrieNodeFromPeerTask task =
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
task.assignPeer(peer);
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final Map<Bytes, Bytes> peerResult) {
return peerResult.isEmpty();
}

@Override
protected boolean successfulResult(final Map<Bytes, Bytes> peerResult) {
return !emptyResult(peerResult);
}
}
Loading