Skip to content

Commit

Permalink
Merge branch 'main' into chore/removeDeprecatedHostWhitelistRemove
Browse files Browse the repository at this point in the history
  • Loading branch information
macfarla authored Dec 11, 2024
2 parents cd63563 + c62cd21 commit 1996166
Show file tree
Hide file tree
Showing 76 changed files with 2,825 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,10 @@ public BesuController build() {
.build());
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem);
final EthContext ethContext =
new EthContext(ethPeers, ethMessages, snapMessages, scheduler, peerTaskExecutor);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand All @@ -718,7 +719,8 @@ public BesuController build() {
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningConfiguration);

final List<PeerValidator> peerValidators = createPeerValidators(protocolSchedule);
final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);

final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
Expand Down Expand Up @@ -947,6 +949,7 @@ private PivotBlockSelector createPivotSelector(
ethContext,
metricsSystem,
genesisConfigOptions,
syncConfig,
unverifiedForkchoiceSupplier,
unsubscribeForkchoiceListener);
} else {
Expand Down Expand Up @@ -1179,29 +1182,42 @@ private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStor
* Create peer validators list.
*
* @param protocolSchedule the protocol schedule
* @param peerTaskExecutor the peer task executor
* @return the list
*/
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock = genesisConfigOptions.getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
new DaoForkPeerValidator(
protocolSchedule, peerTaskExecutor, syncConfig, metricsSystem, daoBlock.getAsLong()));
}

final OptionalLong classicBlock = genesisConfigOptions.getClassicForkBlock();
// setup classic validator
if (classicBlock.isPresent()) {
validators.add(
new ClassicForkPeerValidator(protocolSchedule, metricsSystem, classicBlock.getAsLong()));
new ClassicForkPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
classicBlock.getAsLong()));
}

for (final Map.Entry<Long, Hash> requiredBlock : requiredBlocks.entrySet()) {
validators.add(
new RequiredBlocksPeerValidator(
protocolSchedule, metricsSystem, requiredBlock.getKey(), requiredBlock.getValue()));
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
requiredBlock.getKey(),
requiredBlock.getValue()));
}

final CheckpointConfigOptions checkpointConfigOptions =
Expand All @@ -1210,6 +1226,8 @@ protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protoc
validators.add(
new CheckpointBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
checkpointConfigOptions.getNumber().orElseThrow(),
checkpointConfigOptions.getHash().map(Hash::fromHexString).orElseThrow()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@ protected MiningCoordinator createMiningCoordinator(
new BackwardSyncContext(
protocolContext,
protocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down Expand Up @@ -235,14 +237,17 @@ protected PluginServiceFactory createAdditionalPluginServices(
}

@Override
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule);
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule, peerTaskExecutor);
final OptionalLong powTerminalBlockNumber = genesisConfigOptions.getTerminalBlockNumber();
final Optional<Hash> powTerminalBlockHash = genesisConfigOptions.getTerminalBlockHash();
if (powTerminalBlockHash.isPresent() && powTerminalBlockNumber.isPresent()) {
retval.add(
new RequiredBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
powTerminalBlockNumber.getAsLong(),
powTerminalBlockHash.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected MiningCoordinator createMiningCoordinator(
new TransitionBackwardSyncContext(
protocolContext,
transitionProtocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class TransitionControllerBuilderTest {
@Mock ProtocolContext protocolContext;
@Mock MutableBlockchain mockBlockchain;
@Mock TransactionPool transactionPool;
@Mock PeerTaskExecutor peerTaskExecutor;
@Mock SyncState syncState;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand All @@ -43,13 +44,15 @@ public class TransitionBackwardSyncContext extends BackwardSyncContext {
public TransitionBackwardSyncContext(
final ProtocolContext protocolContext,
final TransitionProtocolSchedule transitionProtocolSchedule,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final SyncState syncState,
final StorageProvider storageProvider) {
super(
protocolContext,
transitionProtocolSchedule,
synchronizerConfiguration,
metricsSystem,
ethContext,
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.ethereum.rlp.RLPInput;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -291,4 +292,16 @@ public Hash addressHash() {
return Hash.hash(this);
}
}

@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Address)) {
return false;
}
Address other = (Address) obj;
return Arrays.equals(this.toArrayUnsafe(), other.toArrayUnsafe());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;

import java.util.Optional;

public class EthContext {
Expand All @@ -22,24 +24,31 @@ public class EthContext {
private final EthMessages ethMessages;
private final Optional<EthMessages> snapMessages;
private final EthScheduler scheduler;
private final PeerTaskExecutor peerTaskExecutor;

public EthContext(
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthMessages snapMessages,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.of(snapMessages);
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthContext(
final EthPeers ethPeers, final EthMessages ethMessages, final EthScheduler scheduler) {
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.empty();
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthPeers getEthPeers() {
Expand All @@ -57,4 +66,8 @@ public Optional<EthMessages> getSnapMessages() {
public EthScheduler getScheduler() {
return scheduler;
}

public PeerTaskExecutor getPeerTaskExecutor() {
return peerTaskExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public InvalidPeerTaskResponseException() {
super();
}

public InvalidPeerTaskResponseException(final String message) {
super(message);
}

public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ default int getRetriesWithSamePeer() {
Predicate<EthPeer> getPeerRequirementFilter();

/**
* Checks if the supplied result is considered a success
* Performs a high level check of the results, returning a PeerTaskValidationResponse to describe
* the result of the check
*
* @return true if the supplied result is considered a success
* @param result The results of the PeerTask, as returned by processResponse
* @return a PeerTaskValidationResponse to describe the result of the check
*/
boolean isSuccess(T result);
PeerTaskValidationResponse validateResult(T result);

default void postProcessResult(final PeerTaskExecutorResult<T> result) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(PeerTaskExecutor.class);

private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
Expand Down Expand Up @@ -98,8 +101,8 @@ public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
if (peer.isEmpty()) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE, Optional.empty());
break;
}
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
Expand Down Expand Up @@ -138,43 +141,59 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
inflightRequestCountForThisTaskClass.decrementAndGet();
}

if (peerTask.isSuccess(result)) {
PeerTaskValidationResponse validationResponse = peerTask.validateResult(result);
if (validationResponse == PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD) {
peer.recordUsefulResponse();
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.SUCCESS,
Optional.of(peer));
peerTask.postProcessResult(executorResult);
} else {
// At this point, the result is most likely empty. Technically, this is a valid result, so
// we don't penalise the peer, but it's also a useless result, so we return
// INVALID_RESPONSE code
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId());
validationResponse
.getDisconnectReason()
.ifPresent((disconnectReason) -> peer.disconnect(disconnectReason));
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.INVALID_RESPONSE,
Optional.of(peer));
}

} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
Optional.empty(),
PeerTaskExecutorResponseCode.PEER_DISCONNECTED,
Optional.of(peer));

} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT, Optional.of(peer));

} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(taskClassName).inc();
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE, Optional.of(peer));

} catch (ExecutionException e) {
} catch (Exception e) {
internalExceptionCounter.labels(taskClassName).inc();
LOG.error("Server error found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
Optional.empty(),
PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR,
Optional.of(peer));
}
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.util.Optional;

public record PeerTaskExecutorResult<T>(
Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}
Optional<T> result, PeerTaskExecutorResponseCode responseCode, Optional<EthPeer> ethPeer) {}
Loading

0 comments on commit 1996166

Please sign in to comment.