Skip to content

Commit

Permalink
Refactor and fix retrying get block switching peer (#4256)
Browse files Browse the repository at this point in the history
* Refactor retrying peer task switching peers at every try

RetryingGetBlockFromPeersTask had a problem that prevented to complete
when all the peers were tried without success, and that also had the
consequence to not removing the failed requested block for the internal
caches in BlockPropagationManager, that could cause a stall since that
block will to be tried to be retrieved again.

Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored and jflo committed Aug 18, 2022
1 parent 58290f7 commit 10e22b5
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bug Fixes
- Fixes off-by-one error for mainnet TTD fallback [#4223](https://github.com/hyperledger/besu/pull/4223)
- Fix off-by-one error in AbstractRetryingPeerTask [#4254](https://github.com/hyperledger/besu/pull/4254)
- Refactor and fix retrying get block switching peer [#4256](https://github.com/hyperledger/besu/pull/4256)
- Fix encoding of key (short hex) in eth_getProof [#4261](https://github.com/hyperledger/besu/pull/4261)
- Fix for post-merge networks fast-sync [#4224](https://github.com/hyperledger/besu/pull/4224), [#4276](https://github.com/hyperledger/besu/pull/4276)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ protected void handleTaskError(final Throwable error) {
}

protected boolean isRetryableError(final Throwable error) {
final boolean isPeerError =
error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;
return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error));
}

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
protected boolean isPeerFailure(final Throwable error) {
return error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;
}

protected EthContext getEthContext() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetryingPeerTask<T> {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractRetryingSwitchingPeerTask.class);

private final Set<EthPeer> triedPeers = new HashSet<>();
private final Set<EthPeer> failedPeers = new HashSet<>();

protected AbstractRetryingSwitchingPeerTask(
final EthContext ethContext,
final MetricsSystem metricsSystem,
final Predicate<T> isEmptyResponse,
final int maxRetries) {
super(ethContext, maxRetries, isEmptyResponse, metricsSystem);
}

@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
}

protected abstract CompletableFuture<T> executeTaskOnCurrentPeer(final EthPeer peer);

@Override
protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedPeer) {

final Optional<EthPeer> maybePeer =
assignedPeer
.filter(u -> getRetryCount() == 1) // first try with the assigned peer if present
.map(Optional::of)
.orElseGet(this::selectNextPeer); // otherwise select a new one from the pool

if (maybePeer.isEmpty()) {
traceLambda(
LOG,
"No peer found to try to execute task at attempt {}, tried peers {}",
this::getRetryCount,
triedPeers::toString);
final var ex = new NoAvailablePeersException();
return CompletableFuture.failedFuture(ex);
}

final EthPeer peerToUse = maybePeer.get();
assignPeer(peerToUse);

traceLambda(
LOG,
"Trying to execute task on peer {}, attempt {}",
this::getAssignedPeer,
this::getRetryCount);

return executeTaskOnCurrentPeer(peerToUse)
.thenApply(
peerResult -> {
traceLambda(
LOG,
"Got result {} from peer {}, attempt {}",
peerResult::toString,
peerToUse::toString,
this::getRetryCount);
result.complete(peerResult);
return peerResult;
});
}

@Override
protected void handleTaskError(final Throwable error) {
if (isPeerFailure(error)) {
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer));
}
super.handleTaskError(error);
}

@Override
protected boolean isRetryableError(final Throwable error) {
return error instanceof TimeoutException || isPeerFailure(error);
}

private Optional<EthPeer> selectNextPeer() {
final Optional<EthPeer> maybeNextPeer = remainingPeersToTry().findFirst();

if (maybeNextPeer.isEmpty()) {
// tried all the peers, restart from the best one but excluding the failed ones
refreshPeers();
triedPeers.retainAll(failedPeers);
return remainingPeersToTry().findFirst();
}

return maybeNextPeer;
}

private Stream<EthPeer> remainingPeersToTry() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !peer.isDisconnected() && !triedPeers.contains(peer));
}

private void refreshPeers() {
final EthPeers peers = getEthContext().getEthPeers();
// If we are at max connections, then refresh peers disconnecting one of the failed peers,
// or the least useful
if (peers.peerCount() >= peers.getMaxPeers()) {
failedPeers.stream()
.filter(peer -> !peer.isDisconnected())
.findAny()
.or(() -> peers.streamAvailablePeers().sorted(peers.getBestChainComparator()).findFirst())
.ifPresent(
peer -> {
debugLambda(LOG, "Refresh peers disconnecting peer {}", peer::toString);
peer.disconnect(DisconnectReason.USELESS_PEER);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,135 +14,101 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.task;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryingGetBlockFromPeersTask
extends AbstractRetryingPeerTask<AbstractPeerTask.PeerTaskResult<Block>> {
extends AbstractRetryingSwitchingPeerTask<AbstractPeerTask.PeerTaskResult<Block>> {

private static final Logger LOG = LoggerFactory.getLogger(RetryingGetBlockFromPeersTask.class);

private final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final Optional<Hash> blockHash;
private final Optional<Hash> maybeBlockHash;
private final long blockNumber;
private final Set<EthPeer> triedPeers = new HashSet<>();

protected RetryingGetBlockFromPeersTask(
final ProtocolContext protocolContext,
final EthContext ethContext,
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> blockHash,
final Optional<Hash> maybeBlockHash,
final long blockNumber) {
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
this.protocolContext = protocolContext;
super(ethContext, metricsSystem, Objects::isNull, maxRetries);
this.protocolSchedule = protocolSchedule;
this.blockHash = blockHash;
this.maybeBlockHash = maybeBlockHash;
this.blockNumber = blockNumber;
}

public static RetryingGetBlockFromPeersTask create(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> hash,
final Optional<Hash> maybeHash,
final long blockNumber) {
return new RetryingGetBlockFromPeersTask(
protocolContext,
ethContext,
protocolSchedule,
metricsSystem,
maxRetries,
hash,
blockNumber);
}

@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
ethContext, protocolSchedule, metricsSystem, maxRetries, maybeHash, blockNumber);
}

@Override
protected CompletableFuture<AbstractPeerTask.PeerTaskResult<Block>> executePeerTask(
final Optional<EthPeer> assignedPeer) {

protected CompletableFuture<PeerTaskResult<Block>> executeTaskOnCurrentPeer(
final EthPeer currentPeer) {
final GetBlockFromPeerTask getBlockTask =
GetBlockFromPeerTask.create(
protocolSchedule, getEthContext(), blockHash, blockNumber, getMetricsSystem());

getBlockTask.assignPeer(
assignedPeer
.filter(unused -> getRetryCount() == 1) // first try with the assigned preferred peer
.orElseGet( // then selecting a new one from the pool
() -> {
assignPeer(selectNextPeer());
return getAssignedPeer().get();
}));

LOG.debug(
"Getting block {} ({}) from peer {}, attempt {}",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
protocolSchedule, getEthContext(), maybeBlockHash, blockNumber, getMetricsSystem());
getBlockTask.assignPeer(currentPeer);

return executeSubTask(getBlockTask::run)
.thenApply(
peerResult -> {
debugLambda(
LOG,
"Got block {} from peer {}, attempt {}",
peerResult.getResult()::toLogString,
peerResult.getPeer()::toString,
this::getRetryCount);
result.complete(peerResult);
return peerResult;
});
}

private EthPeer selectNextPeer() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !triedPeers.contains(peer))
.findFirst()
.orElseThrow(NoAvailablePeersException::new);
}

@Override
protected boolean isRetryableError(final Throwable error) {
return (blockNumber > protocolContext.getBlockchain().getChainHeadBlockNumber())
&& (super.isRetryableError(error) || error instanceof IncompleteResultsException);
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
}

@Override
protected void handleTaskError(final Throwable error) {
if (getRetryCount() < getMaxRetries()) {
LOG.debug(
"Failed to get block {} ({}) from peer {}, attempt {}, retrying later",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
debugLambda(
LOG,
"Failed to get block {} from peer {}, attempt {}, retrying later",
this::logBlockNumberMaybeHash,
this::getAssignedPeer,
this::getRetryCount);
} else {
LOG.warn(
"Failed to get block {} ({}) after {} retries", blockNumber, blockHash, getRetryCount());
"Failed to get block {} after {} retries", logBlockNumberMaybeHash(), getRetryCount());
}
super.handleTaskError(error);
}

private String logBlockNumberMaybeHash() {
return blockNumber + maybeBlockHash.map(h -> " (" + h.toHexString() + ")").orElse("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ private CompletableFuture<Block> getBlockFromPeers(
final Optional<Hash> blockHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public CompletableFuture<Void> executeAsync(final Hash hash) {
private CompletableFuture<Block> requestBlock(final Hash targetHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
context.getProtocolContext(),
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* @param <R> The type of data returned from the network
*/
public abstract class AbstractMessageTaskTest<T, R> {
protected static final int MAX_PEERS = 5;
protected static Blockchain blockchain;
protected static ProtocolSchedule protocolSchedule;
protected static ProtocolContext protocolContext;
Expand All @@ -77,7 +78,6 @@ public static void setup() {
blockchain = blockchainSetupUtil.getBlockchain();
protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
protocolContext = blockchainSetupUtil.getProtocolContext();

assertThat(blockchainSetupUtil.getMaxBlockNumber()).isGreaterThanOrEqualTo(20L);
}

Expand All @@ -91,7 +91,7 @@ public void setupTest() {
EthProtocol.NAME,
TestClock.fixed(),
metricsSystem,
25,
MAX_PEERS,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =
Expand Down
Loading

0 comments on commit 10e22b5

Please sign in to comment.