Skip to content

Commit

Permalink
7311 add get headers from peer task (#7781)
Browse files Browse the repository at this point in the history
* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken BesuCommandTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: add class

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move PeerTaskFeatureToggle to more appropriate location

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: add X prefix to peertask-system-enabled

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move --Xpeertask-system-enabled out of BesuCommand and make hidden

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move isPeerTaskSystemEnabled to SynchronizerOptions

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move PeerTaskFeatureToggleTestHelper to TestUtil and fix RunnerTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove PeerTaskFeatureToggle in favor of including isPeerTaskSystemEnabled in SynchronizerConfiguration

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Adjust to the removal of PeerTaskFeatureToggle and use SynchronizerConfiguration to get the toggle instead

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reduce timeout in PeerTaskRequestSender to 5s

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor PeerManager to be an interface

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix DownloadReceiptsStep when using peer task system

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename PeerManager to PeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reword PeerSelector javadoc to avoid implementation details

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use ConcurrentHashMap in DefaultPeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reword trace log in DefaultPeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unused imports

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use a 1 second delay between retries in PeerTaskExecutor to match old implementation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Modify PeerTaskExecutor metric to include response time from peer

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use SubProtocol instead of subprotocol name string in PeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: rename timing context to ignored to prevent intellij warnings

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use constants for number of retries

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Convert PeerTaskExecutorResult to a record

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move peer selection logic to PeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up everything broken after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Attempt to improve performance of peer task system in pipeline

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: fix compile check

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken workflow

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reduce logging in JsonRpcExecutor to trace level

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: More changes in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Make changes as discussed in walkthrough meeting

Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update after merge and make discussed changes from walkthrough discussion

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Change to regular HashMap

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove runtime exception again

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework PeerTaskExecutor retry system to be 0-based

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken DownloadReceiptsStepTest test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move GetReceipts to services worker for parallelism

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor peer task system usage in DownloadReceiptsStep to better match old system

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unused async methods in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise existing peer selection behavior in EthPeers

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Redo getPeer again to include hasAvailableRequestCapacity check

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add protocol spec supplier to GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Import PeerNotConnected class instead of using fully qualified class name

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Change to specifying retry counts in PeerTask instead of behavior enums

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up javadoc

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add additional metrics to PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add Predicate to PeerTask to check for partial success

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix incorrect name on isPartialSuccessTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement isPartialSuccess and add unit tests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Also filter by whether a peer is fully validated

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unneeded throws in RunnerTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up inflight requests gauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add javadoc to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin-api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update changelog

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement GetHeadersFromPeerTask and use in DetermineCommonAncestorTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Handle headers with no receipts as a special case in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Complete merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Get DetermineCommonAncestorTask working with peer task system

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use taskName instead of className for labelNames

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use snake_case for metric names

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use _total metric name suffix

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: rework partial success handling

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update GetReceiptsFromPeerTask with partialSuccess changes

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update GetHeadersFromPeerTask with partialSuccess changes

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add default implementation to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use Peer task systems GetHeadersFromPeerTask in GetBlockFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken unit test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unused constructor from AbstractPeerBlockValidator

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use GetHeadersFromPeerTask in AbstractPeerBlockValidator

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use peer task executor in SyncTargetManager

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix javadoc on BesuControllerBuilder

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove logs used to confirm operation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement GetHeadersFromPeerTask in FastSyncActions and PivotBlockConfirmer

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename parseResponse to processResponse

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Wrap peer task system usage in ethScheduler call to match other usages

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: apply spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move check for empty trie hash into GetReceiptsFromPeerTask and update unit test to test for this functionality

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix compile issue after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix compile issue after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove BodyValidator and update code and test to match

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement GetHeadersForPeerTask usage in DownloadHeadersStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: remove unneeded logs

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up pre-fill and add test to test failure scenario

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use ProtocolSchedule.anyMatch to find if any ProtocolSpecs are PoS, remove new usages of currentProtocolSpecSupplier

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Only attempt to remove headers on successful requests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use peer task system in RangeHeadersFetcher

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use peer task system in DownloadHeaderSequenceTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix GetHeadersFromPeerTask mocking in CheckPointSyncChainDownloaderTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Extract peer task executor answer for getHeaders to separate class for reuse in tests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement peer task system usage in BackwardSyncStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement peer task system usage in ChainHeadTracker

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement peer task system usage in PivotSelectorFromSafeBlock and improve logging

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement unit test for GetHeadersFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up merge compile error

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Ensure FastSyncActions and PivotSelectorFromSafeBlock retry getting headers for all peers, matching RetryingGetHeaderFromPeerByHashTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Change PeerTaskExecutorResult.ethPeer to an Optional

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use CancellationException instead of InterruptedException in PivotBlockConfirmer

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use PivotBlockRetriever.MAX_QUERY_RETRIES_PER_PEER to set retries for GetHeadersFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add PeerTask.shouldDisconnectPeer and ensure functionality matches old code

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove old info logs

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken test by correctly including peer in PeerTaskExecutorResults in test classes

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix incorrect equality tests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move PeerTaskExecutor into EthContext to reduce plumbing changes

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove protocol check from GetHeadersFromPeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken integration test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor peer task validation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor peer task validation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use peer count for retry count when getting headers in BackwardSyncStep, FastSyncActions, and PivotSelectorFromSafeBlock

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move chainstate update into GetHeadersFromPeerTask.postProcessResult

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix compile errors

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update after merge

Signed-off-by: Matilda Clerke <[email protected]>

---------

Signed-off-by: Matilda Clerke <[email protected]>
Signed-off-by: Matilda-Clerke <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
  • Loading branch information
Matilda-Clerke and macfarla authored Dec 11, 2024
1 parent 4435f75 commit 657efff
Show file tree
Hide file tree
Showing 75 changed files with 2,812 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,10 @@ public BesuController build() {
.build());
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem);
final EthContext ethContext =
new EthContext(ethPeers, ethMessages, snapMessages, scheduler, peerTaskExecutor);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand All @@ -718,7 +719,8 @@ public BesuController build() {
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningConfiguration);

final List<PeerValidator> peerValidators = createPeerValidators(protocolSchedule);
final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);

final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
Expand Down Expand Up @@ -947,6 +949,7 @@ private PivotBlockSelector createPivotSelector(
ethContext,
metricsSystem,
genesisConfigOptions,
syncConfig,
unverifiedForkchoiceSupplier,
unsubscribeForkchoiceListener);
} else {
Expand Down Expand Up @@ -1179,29 +1182,42 @@ private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStor
* Create peer validators list.
*
* @param protocolSchedule the protocol schedule
* @param peerTaskExecutor the peer task executor
* @return the list
*/
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock = genesisConfigOptions.getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
new DaoForkPeerValidator(
protocolSchedule, peerTaskExecutor, syncConfig, metricsSystem, daoBlock.getAsLong()));
}

final OptionalLong classicBlock = genesisConfigOptions.getClassicForkBlock();
// setup classic validator
if (classicBlock.isPresent()) {
validators.add(
new ClassicForkPeerValidator(protocolSchedule, metricsSystem, classicBlock.getAsLong()));
new ClassicForkPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
classicBlock.getAsLong()));
}

for (final Map.Entry<Long, Hash> requiredBlock : requiredBlocks.entrySet()) {
validators.add(
new RequiredBlocksPeerValidator(
protocolSchedule, metricsSystem, requiredBlock.getKey(), requiredBlock.getValue()));
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
requiredBlock.getKey(),
requiredBlock.getValue()));
}

final CheckpointConfigOptions checkpointConfigOptions =
Expand All @@ -1210,6 +1226,8 @@ protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protoc
validators.add(
new CheckpointBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
checkpointConfigOptions.getNumber().orElseThrow(),
checkpointConfigOptions.getHash().map(Hash::fromHexString).orElseThrow()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@ protected MiningCoordinator createMiningCoordinator(
new BackwardSyncContext(
protocolContext,
protocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down Expand Up @@ -235,14 +237,17 @@ protected PluginServiceFactory createAdditionalPluginServices(
}

@Override
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule);
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule, peerTaskExecutor);
final OptionalLong powTerminalBlockNumber = genesisConfigOptions.getTerminalBlockNumber();
final Optional<Hash> powTerminalBlockHash = genesisConfigOptions.getTerminalBlockHash();
if (powTerminalBlockHash.isPresent() && powTerminalBlockNumber.isPresent()) {
retval.add(
new RequiredBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
powTerminalBlockNumber.getAsLong(),
powTerminalBlockHash.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected MiningCoordinator createMiningCoordinator(
new TransitionBackwardSyncContext(
protocolContext,
transitionProtocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class TransitionControllerBuilderTest {
@Mock ProtocolContext protocolContext;
@Mock MutableBlockchain mockBlockchain;
@Mock TransactionPool transactionPool;
@Mock PeerTaskExecutor peerTaskExecutor;
@Mock SyncState syncState;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand All @@ -43,13 +44,15 @@ public class TransitionBackwardSyncContext extends BackwardSyncContext {
public TransitionBackwardSyncContext(
final ProtocolContext protocolContext,
final TransitionProtocolSchedule transitionProtocolSchedule,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final SyncState syncState,
final StorageProvider storageProvider) {
super(
protocolContext,
transitionProtocolSchedule,
synchronizerConfiguration,
metricsSystem,
ethContext,
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;

import java.util.Optional;

public class EthContext {
Expand All @@ -22,24 +24,31 @@ public class EthContext {
private final EthMessages ethMessages;
private final Optional<EthMessages> snapMessages;
private final EthScheduler scheduler;
private final PeerTaskExecutor peerTaskExecutor;

public EthContext(
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthMessages snapMessages,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.of(snapMessages);
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthContext(
final EthPeers ethPeers, final EthMessages ethMessages, final EthScheduler scheduler) {
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.empty();
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthPeers getEthPeers() {
Expand All @@ -57,4 +66,8 @@ public Optional<EthMessages> getSnapMessages() {
public EthScheduler getScheduler() {
return scheduler;
}

public PeerTaskExecutor getPeerTaskExecutor() {
return peerTaskExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public InvalidPeerTaskResponseException() {
super();
}

public InvalidPeerTaskResponseException(final String message) {
super(message);
}

public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ default int getRetriesWithSamePeer() {
Predicate<EthPeer> getPeerRequirementFilter();

/**
* Checks if the supplied result is considered a success
* Performs a high level check of the results, returning a PeerTaskValidationResponse to describe
* the result of the check
*
* @return true if the supplied result is considered a success
* @param result The results of the PeerTask, as returned by processResponse
* @return a PeerTaskValidationResponse to describe the result of the check
*/
boolean isSuccess(T result);
PeerTaskValidationResponse validateResult(T result);

default void postProcessResult(final PeerTaskExecutorResult<T> result) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(PeerTaskExecutor.class);

private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
Expand Down Expand Up @@ -98,8 +101,8 @@ public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
if (peer.isEmpty()) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE, Optional.empty());
break;
}
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
Expand Down Expand Up @@ -138,43 +141,59 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
inflightRequestCountForThisTaskClass.decrementAndGet();
}

if (peerTask.isSuccess(result)) {
PeerTaskValidationResponse validationResponse = peerTask.validateResult(result);
if (validationResponse == PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD) {
peer.recordUsefulResponse();
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.SUCCESS,
Optional.of(peer));
peerTask.postProcessResult(executorResult);
} else {
// At this point, the result is most likely empty. Technically, this is a valid result, so
// we don't penalise the peer, but it's also a useless result, so we return
// INVALID_RESPONSE code
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId());
validationResponse
.getDisconnectReason()
.ifPresent((disconnectReason) -> peer.disconnect(disconnectReason));
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.INVALID_RESPONSE,
Optional.of(peer));
}

} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
Optional.empty(),
PeerTaskExecutorResponseCode.PEER_DISCONNECTED,
Optional.of(peer));

} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT, Optional.of(peer));

} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(taskClassName).inc();
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE, Optional.of(peer));

} catch (ExecutionException e) {
} catch (Exception e) {
internalExceptionCounter.labels(taskClassName).inc();
LOG.error("Server error found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
Optional.empty(),
PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR,
Optional.of(peer));
}
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.util.Optional;

public record PeerTaskExecutorResult<T>(
Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}
Optional<T> result, PeerTaskExecutorResponseCode responseCode, Optional<EthPeer> ethPeer) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;

import java.util.Optional;

public enum PeerTaskValidationResponse {
NO_RESULTS_RETURNED(null),
TOO_MANY_RESULTS_RETURNED(null),
RESULTS_DO_NOT_MATCH_QUERY(null),
NON_SEQUENTIAL_HEADERS_RETURNED(
DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_NON_SEQUENTIAL_HEADERS),
RESULTS_VALID_AND_GOOD(null);
private final Optional<DisconnectMessage.DisconnectReason> disconnectReason;

PeerTaskValidationResponse(final DisconnectMessage.DisconnectReason disconnectReason) {
this.disconnectReason = Optional.ofNullable(disconnectReason);
}

public Optional<DisconnectMessage.DisconnectReason> getDisconnectReason() {
return disconnectReason;
}
}
Loading

0 comments on commit 657efff

Please sign in to comment.