Skip to content

Commit

Permalink
Merge branch 'main' into eof/subcontainer_checks
Browse files Browse the repository at this point in the history
  • Loading branch information
jflo authored Jun 20, 2024
2 parents 8d82334 + f5e5ad5 commit 8a81d4a
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 65 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
### Bug fixes
- Validation errors ignored in accounts-allowlist and empty list [#7138](https://github.com/hyperledger/besu/issues/7138)
- Fix "Invalid block detected" for BFT chains using Bonsai DB [#7204](https://github.com/hyperledger/besu/pull/7204)
- Fix "Could not confirm best peer had pivot block" [#7109](https://github.com/hyperledger/besu/issues/7109)
- Fix "Chain Download Halt" [#6884](https://github.com/hyperledger/besu/issues/6884)

## 24.6.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected EthProtocolManager createEthProtocolManager(
var mergeBestPeerComparator =
new TransitionBestPeerComparator(
genesisConfigOptions.getTerminalTotalDifficulty().map(Difficulty::of).orElseThrow());
ethPeers.setBestChainComparator(mergeBestPeerComparator);
ethPeers.setBestPeerComparator(mergeBestPeerComparator);
mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator);

Optional<MergePeerFilter> filterToUse = Optional.of(new MergePeerFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,12 @@ private boolean registerDisconnect(final EthPeer peer, final PeerConnection conn
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) {
LOG.debug("Disconnected USEFULL peer {}", peer);
LOG.atDebug().setMessage("Disconnected USEFULL peer {}").addArgument(peer).log();
} else {
LOG.debug("Disconnected EthPeer {}", peer.getLoggableId());
LOG.atDebug()
.setMessage("Disconnected EthPeer {}")
.addArgument(peer.getLoggableId())
.log();
}
}
}
Expand Down Expand Up @@ -318,11 +321,11 @@ public Stream<EthPeer> streamAvailablePeers() {
public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers()
.filter(EthPeer::isFullyValidated)
.sorted(getBestChainComparator().reversed());
.sorted(getBestPeerComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
return streamAvailablePeers().max(getBestChainComparator());
return streamAvailablePeers().max(getBestPeerComparator());
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
Expand All @@ -331,15 +334,15 @@ public Optional<EthPeer> bestPeerWithHeightEstimate() {
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria).max(getBestChainComparator());
return streamAvailablePeers().filter(matchesCriteria).max(getBestPeerComparator());
}

public void setBestChainComparator(final Comparator<EthPeer> comparator) {
public void setBestPeerComparator(final Comparator<EthPeer> comparator) {
LOG.info("Updating the default best peer comparator");
bestPeerComparator = comparator;
}

public Comparator<EthPeer> getBestChainComparator() {
public Comparator<EthPeer> getBestPeerComparator() {
return bestPeerComparator;
}

Expand Down Expand Up @@ -394,8 +397,7 @@ public boolean shouldConnect(final Peer peer, final boolean inbound) {

public void disconnectWorstUselessPeer() {
streamAvailablePeers()
.sorted(getBestChainComparator())
.findFirst()
.min(getBestPeerComparator())
.ifPresent(
peer -> {
LOG.atDebug()
Expand Down Expand Up @@ -551,29 +553,40 @@ private boolean addPeerToEthPeers(final EthPeer peer) {
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) {
LOG.trace(
"Too many peers. Disconnect connection: {}, max connections {}",
connection,
peerUpperBound);
LOG.atTrace()
.setMessage("Too many peers. Disconnect connection: {}, max connections {}")
.addArgument(connection)
.addArgument(peerUpperBound)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
// Disconnect if too many remotely-initiated connections
if (connection.inboundInitiated()
&& !canExceedPeerLimits(id)
&& remoteConnectionLimitReached()) {
LOG.trace(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}",
connection,
maxRemotelyInitiatedConnections);
LOG.atTrace()
.setMessage(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}")
.addArgument(connection)
.addArgument(maxRemotelyInitiatedConnections)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
final boolean added = (completeConnections.putIfAbsent(id, peer) == null);
if (added) {
LOG.trace("Added peer {} with connection {} to completeConnections", id, connection);
LOG.atTrace()
.setMessage("Added peer {} with connection {} to completeConnections")
.addArgument(id)
.addArgument(connection)
.log();
} else {
LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection);
LOG.atTrace()
.setMessage("Did not add peer {} with connection {} to completeConnections")
.addArgument(id)
.addArgument(connection)
.log();
}
return added;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public class EthScheduler {
private static final Logger LOG = LoggerFactory.getLogger(EthScheduler.class);

private final Duration defaultTimeout = Duration.ofSeconds(5);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1);
private static final int TX_WORKER_CAPACITY = 1_000;
Expand Down Expand Up @@ -219,10 +218,6 @@ public CompletableFuture<Void> scheduleBlockCreationTask(final Runnable task) {
return CompletableFuture.runAsync(task, blockCreationExecutor);
}

public <T> CompletableFuture<T> timeout(final EthTask<T> task) {
return timeout(task, defaultTimeout);
}

public <T> CompletableFuture<T> timeout(final EthTask<T> task, final Duration timeout) {
final CompletableFuture<T> future = task.run();
final CompletableFuture<T> result = timeout(future, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {

public static final int MAX_RETRIES = 4;
private final EthContext ethContext;
private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
Expand All @@ -43,7 +44,10 @@ private RetryingGetAccountRangeFromPeerTask(
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(
ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem);
ethContext,
MAX_RETRIES,
data -> data.accounts().isEmpty() && data.proofs().isEmpty(),
metricsSystem);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,18 @@ protected void handleTaskError(final Throwable error) {
() ->
ethContext
.getScheduler()
// wait for a new peer for up to 5 seconds
.timeout(waitTask, Duration.ofSeconds(5))
// execute the task again
.whenComplete((r, t) -> executeTaskTimed()));
return;
}

LOG.debug(
"Retrying after recoverable failure from peer task {}: {}",
this.getClass().getSimpleName(),
cause.getMessage());
LOG.atDebug()
.setMessage("Retrying after recoverable failure from peer task {}: {}")
.addArgument(this.getClass().getSimpleName())
.addArgument(cause.getMessage())
.log();
// Wait before retrying on failure
executeSubTask(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class PivotBlockRetriever {

private static final Logger LOG = LoggerFactory.getLogger(PivotBlockRetriever.class);
public static final int MAX_QUERY_RETRIES_PER_PEER = 4;
public static final int MAX_QUERY_RETRIES_PER_PEER = 5;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 250;
private static final int SUSPICIOUS_NUMBER_OF_RETRIES = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -41,6 +42,7 @@

public class SyncTargetManager extends AbstractSyncTargetManager {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetManager.class);
private static final int SECONDS_PER_REQUEST = 6; // 5s per request + 1s wait between retries

private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final ProtocolSchedule protocolSchedule;
Expand Down Expand Up @@ -93,7 +95,9 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
return completedFuture(Optional.empty());
} else {
final EthPeer bestPeer = maybeBestPeer.get();
if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
// Do not check the best peers estimated height if we are doing PoS
if (!protocolSchedule.getByBlockHeader(pivotBlockHeader).isPoS()
&& bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
LOG.info(
"Best peer {} has chain height {} below pivotBlock height {}. Waiting for better peers. Current {} of max {}",
maybeBestPeer.map(EthPeer::getLoggableId).orElse("none"),
Expand Down Expand Up @@ -121,7 +125,8 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
task.assignPeer(bestPeer);
return ethContext
.getScheduler()
.timeout(task)
// Task is a retrying task. Make sure that the timeout is long enough to allow for retries.
.timeout(task, Duration.ofSeconds(MAX_QUERY_RETRIES_PER_PEER * SECONDS_PER_REQUEST + 2))
.thenCompose(
result -> {
if (peerHasDifferentPivotBlock(result)) {
Expand All @@ -147,11 +152,13 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
})
.exceptionally(
error -> {
LOG.debug(
"Could not confirm best peer {} had pivot block {}",
bestPeer.getLoggableId(),
pivotBlockHeader.getNumber(),
error);
LOG.atDebug()
.setMessage("Could not confirm best peer {} had pivot block {}, {}")
.addArgument(bestPeer.getLoggableId())
.addArgument(pivotBlockHeader.getNumber())
.addArgument(error)
.log();
bestPeer.disconnect(DisconnectReason.USELESS_PEER_CANNOT_CONFIRM_PIVOT_BLOCK);
return Optional.empty();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean shouldSwitchSyncTarget(final EthPeer currentSyncTarget) {
return maybeBestPeer
.map(
bestPeer -> {
if (ethPeers.getBestChainComparator().compare(bestPeer, currentSyncTarget) <= 0) {
if (ethPeers.getBestPeerComparator().compare(bestPeer, currentSyncTarget) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public RangeHeadersFetcher(

public CompletableFuture<List<BlockHeader>> getNextRangeHeaders(
final EthPeer peer, final BlockHeader previousRangeHeader) {
LOG.atTrace()
.setMessage("Requesting next range headers from peer {}")
.addArgument(peer.getLoggableId())
.log();
final int skip = syncConfig.getDownloaderChainSegmentSize() - 1;
final int maximumHeaderRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final long previousRangeNumber = previousRangeHeader.getNumber();
Expand All @@ -78,11 +82,20 @@ public CompletableFuture<List<BlockHeader>> getNextRangeHeaders(
final BlockHeader targetHeader = finalRangeHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - previousRangeNumber;
if (blocksUntilTarget <= 0) {
LOG.atTrace()
.setMessage("Requesting next range headers: no blocks until target: {}")
.addArgument(blocksUntilTarget)
.log();
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
LOG.atTrace()
.setMessage(
"Requesting next range headers: additional header count is 0, blocks until target: {}")
.addArgument(blocksUntilTarget)
.log();
return completedFuture(singletonList(targetHeader));
}
} else {
Expand All @@ -97,11 +110,12 @@ private CompletableFuture<List<BlockHeader>> requestHeaders(
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
LOG.trace(
"Requesting {} range headers, starting from {}, {} blocks apart",
headerCount,
referenceHeader.getNumber(),
skip);
LOG.atTrace()
.setMessage("Requesting {} range headers, starting from {}, {} blocks apart")
.addArgument(headerCount)
.addArgument(referenceHeader.getNumber())
.addArgument(skip)
.log();
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
Expand All @@ -114,7 +128,19 @@ private CompletableFuture<List<BlockHeader>> requestHeaders(
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(headers -> stripExistingRangeHeaders(referenceHeader, headers));
.thenApply(
headers -> {
if (headers.size() < headerCount) {
LOG.atTrace()
.setMessage(
"Peer {} returned fewer headers than requested. Expected: {}, Actual: {}")
.addArgument(peer)
.addArgument(headerCount)
.addArgument(headers.size())
.log();
}
return stripExistingRangeHeaders(referenceHeader, headers);
});
}

private List<BlockHeader> stripExistingRangeHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetRangeSource.class);
private static final Duration RETRY_DELAY_DURATION = Duration.ofSeconds(2);
public static final int DEFAULT_TIME_TO_WAIT_IN_SECONDS = 6;

private final RangeHeadersFetcher fetcher;
private final SyncTargetChecker syncTargetChecker;
Expand Down Expand Up @@ -70,7 +71,7 @@ public SyncTargetRangeSource(
peer,
commonAncestor,
retriesPermitted,
Duration.ofSeconds(5),
Duration.ofSeconds(DEFAULT_TIME_TO_WAIT_IN_SECONDS),
terminationCondition);
}

Expand Down Expand Up @@ -153,7 +154,7 @@ private SyncTargetRange getRangeFromPendingRequest() {
if (retryCount >= retriesPermitted) {
LOG.atDebug()
.setMessage(
"Disconnecting target peer for providing useless or empty range header: {}.")
"Disconnecting target peer {} for providing useless or empty range headers.")
.addArgument(peer)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
Expand All @@ -169,12 +170,20 @@ private SyncTargetRange getRangeFromPendingRequest() {
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for new range headers", e);
return null;
} catch (final ExecutionException e) {
LOG.debug("Failed to retrieve new range headers", e);
this.pendingRequests = Optional.empty();
} catch (final ExecutionException | TimeoutException e) {
if (e instanceof ExecutionException) {
this.pendingRequests = Optional.empty();
}
retryCount++;
return null;
} catch (final TimeoutException e) {
if (retryCount >= retriesPermitted) {
LOG.atDebug()
.setMessage(
"Disconnecting target peer {} for not providing useful range headers: Exception: {}.")
.addArgument(peer)
.addArgument(e)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
}
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask<List<Block>> {
private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksTask.class);

private static final int MIN_SIZE_INCOMPLETE_LIST = 1;
private static final int DEFAULT_RETRIES = 4;
private static final int DEFAULT_RETRIES = 5;

private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
Expand Down
Loading

0 comments on commit 8a81d4a

Please sign in to comment.