Skip to content

Commit

Permalink
Add GetBodiesFromPeerTask (#8040)
Browse files Browse the repository at this point in the history
* 7311: Add PeerTask system for use in future PRs

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Clean up some warnings

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add feature toggle for enabling use of the peertask system where available

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove log used for testing, apply spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add private constructor to PeerTaskFeatureToggle to prevent instantiation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Switch to logging a warning instead of throwing an exception when initializing PeerTaskFeatureToggle multiple times

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update javadoc to match previous commit

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken BesuCommandTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: add class

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move PeerTaskFeatureToggle to more appropriate location

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: add X prefix to peertask-system-enabled

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move --Xpeertask-system-enabled out of BesuCommand and make hidden

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move isPeerTaskSystemEnabled to SynchronizerOptions

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move PeerTaskFeatureToggleTestHelper to TestUtil and fix RunnerTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove PeerTaskFeatureToggle in favor of including isPeerTaskSystemEnabled in SynchronizerConfiguration

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Adjust to the removal of PeerTaskFeatureToggle and use SynchronizerConfiguration to get the toggle instead

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reduce timeout in PeerTaskRequestSender to 5s

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor PeerManager to be an interface

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix DownloadReceiptsStep when using peer task system

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename PeerManager to PeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reword PeerSelector javadoc to avoid implementation details

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use ConcurrentHashMap in DefaultPeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reword trace log in DefaultPeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unused imports

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use a 1 second delay between retries in PeerTaskExecutor to match old implementation

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Modify PeerTaskExecutor metric to include response time from peer

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use SubProtocol instead of subprotocol name string in PeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: rename timing context to ignored to prevent intellij warnings

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use constants for number of retries

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Convert PeerTaskExecutorResult to a record

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move peer selection logic to PeerSelector

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up everything broken after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Attempt to improve performance of peer task system in pipeline

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: fix compile check

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken workflow

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Reduce logging in JsonRpcExecutor to trace level

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: More changes in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Make changes as discussed in walkthrough meeting

Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update after merge and make discussed changes from walkthrough discussion

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Change to regular HashMap

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove runtime exception again

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework PeerTaskExecutor retry system to be 0-based

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken DownloadReceiptsStepTest test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move GetReceipts to services worker for parallelism

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Refactor peer task system usage in DownloadReceiptsStep to better match old system

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unused async methods in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise existing peer selection behavior in EthPeers

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Redo getPeer again to include hasAvailableRequestCapacity check

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add protocol spec supplier to GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Import PeerNotConnected class instead of using fully qualified class name

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Change to specifying retry counts in PeerTask instead of behavior enums

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up javadoc

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add additional metrics to PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add Predicate to PeerTask to check for partial success

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix incorrect name on isPartialSuccessTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Implement isPartialSuccess and add unit tests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Also filter by whether a peer is fully validated

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove unneeded throws in RunnerTest

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up inflight requests gauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add javadoc to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update plugin-api hash

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update changelog

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Handle headers with no receipts as a special case in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Complete merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use taskName instead of className for labelNames

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use snake_case for metric names

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use _total metric name suffix

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: rework partial success handling

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Update GetReceiptsFromPeerTask with partialSuccess changes

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Add default implementation to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken unit test

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Rename parseResponse to processResponse

Signed-off-by: Matilda Clerke <[email protected]>

* add possibility to use the new peer task system when downloading the bodies

Signed-off-by: [email protected] <[email protected]>

* fix loop

Signed-off-by: [email protected] <[email protected]>

* 7311: Wrap peer task system usage in ethScheduler call to match other usages

Signed-off-by: Matilda Clerke <[email protected]>

* small fixes

Signed-off-by: [email protected] <[email protected]>

* update API change

Signed-off-by: [email protected] <[email protected]>

* spotless

Signed-off-by: [email protected] <[email protected]>

* 7311: apply spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Move check for empty trie hash into GetReceiptsFromPeerTask and update unit test to test for this functionality

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix compile issue after merge

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Remove BodyValidator and update code and test to match

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: spotless

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix up pre-fill and add test to test failure scenario

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Use ProtocolSchedule.anyMatch to find if any ProtocolSpecs are PoS, remove new usages of currentProtocolSpecSupplier

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Only attempt to remove headers on successful requests

Signed-off-by: Matilda Clerke <[email protected]>

* 7311: Fix broken stuff after merge

Signed-off-by: Matilda Clerke <[email protected]>

* spotless

Signed-off-by: [email protected] <[email protected]>

* Fix up compile errors after merge

Signed-off-by: Matilda Clerke <[email protected]>

* Add PeerTaskExecutor usage for GetBodies in DownloadHeaderSequenceTask

Signed-off-by: Matilda Clerke <[email protected]>

* Add PeerTaskExecutor usage for GetBodies in ForwardSyncStep and apply spotless

Signed-off-by: Matilda Clerke <[email protected]>

* Allow custom retries against other peers in GetBodiesFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* Fix infinite loop in CheckPointSyncChainDownloaderTest

Signed-off-by: Matilda Clerke <[email protected]>

* spotless

Signed-off-by: Matilda Clerke <[email protected]>

* Update CompleteBlocksWithPeerTask.getBlocks to retrieveBlocksFromPeers and add javadoc

Signed-off-by: Matilda Clerke <[email protected]>

* Add javadoc to GetBodiesFromPeerTask

Signed-off-by: Matilda Clerke <[email protected]>

* 7582: Simplify withdrawals validation

Signed-off-by: Matilda Clerke <[email protected]>

---------

Signed-off-by: Matilda Clerke <[email protected]>
Signed-off-by: [email protected] <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent fa9ca9c commit 4ae3be5
Show file tree
Hide file tree
Showing 19 changed files with 970 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implements PeerTask for getting block bodies from peers, and matches headers to bodies to supply
* full blocks
*/
public class GetBodiesFromPeerTask implements PeerTask<List<Block>> {

private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class);

private static final int DEFAULT_RETRIES_AGAINST_OTHER_PEERS = 5;

private final List<BlockHeader> blockHeaders;
private final ProtocolSchedule protocolSchedule;
private final int allowedRetriesAgainstOtherPeers;

private final long requiredBlockchainHeight;
private final List<Block> blocks = new ArrayList<>();
private final boolean isPoS;

public GetBodiesFromPeerTask(
final List<BlockHeader> blockHeaders, final ProtocolSchedule protocolSchedule) {
this(blockHeaders, protocolSchedule, DEFAULT_RETRIES_AGAINST_OTHER_PEERS);
}

public GetBodiesFromPeerTask(
final List<BlockHeader> blockHeaders,
final ProtocolSchedule protocolSchedule,
final int allowedRetriesAgainstOtherPeers) {
if (blockHeaders == null || blockHeaders.isEmpty()) {
throw new IllegalArgumentException("Block headers must not be empty");
}

this.blockHeaders = blockHeaders;
this.protocolSchedule = protocolSchedule;
this.allowedRetriesAgainstOtherPeers = allowedRetriesAgainstOtherPeers;

this.requiredBlockchainHeight =
blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
this.isPoS = protocolSchedule.getByBlockHeader(blockHeaders.getLast()).isPoS();
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
return GetBlockBodiesMessage.create(
blockHeaders.stream().map(BlockHeader::getBlockHash).toList());
}

@Override
public List<Block> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
// Blocks returned by this method are in the same order as the headers, but might not be
// complete
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData);
final List<BlockBody> blockBodies = blocksMessage.bodies(protocolSchedule);
if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}

for (int i = 0; i < blockBodies.size(); i++) {
final BlockBody blockBody = blockBodies.get(i);
final BlockHeader blockHeader = blockHeaders.get(i);
if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) {
LOG.atDebug().setMessage("Received block body does not match block header").log();
throw new InvalidPeerTaskResponseException();
}

blocks.add(new Block(blockHeader, blockBody));
}
return blocks;
}

@Override
public int getRetriesWithOtherPeer() {
return allowedRetriesAgainstOtherPeers;
}

private boolean blockBodyMatchesBlockHeader(
final BlockBody blockBody, final BlockHeader blockHeader) {
// this method validates that the block body matches the block header by calculating the roots
// of the block body and comparing them to the roots in the block header
if (!BodyValidation.transactionsRoot(blockBody.getTransactions())
.equals(blockHeader.getTransactionsRoot())) {
return false;
}
if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) {
return false;
}
if (!blockBody
.getWithdrawals()
.map(BodyValidation::withdrawalsRoot)
.equals(blockHeader.getWithdrawalsRoot())) {
return false;
}

return true;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
isPoS || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
}

@Override
public PeerTaskValidationResponse validateResult(final List<Block> result) {
if (result.isEmpty()) {
return PeerTaskValidationResponse.NO_RESULTS_RETURNED;
}
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
}

public List<BlockHeader> getBlockHeaders() {
return blockHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public DefaultSynchronizer(
syncState,
metricsSystem,
terminationCondition,
peerTaskExecutor,
syncDurationMetrics));

if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

Expand All @@ -31,19 +32,38 @@ public class DownloadBodiesStep
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SynchronizerConfiguration synchronizerConfiguration;

public DownloadBodiesStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.synchronizerConfiguration = synchronizerConfiguration;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return ethContext
.getScheduler()
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
} else {
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}

private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
final List<BlockHeader> headers) {

final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor());
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
return CompletableFuture.completedFuture(blocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask;

Expand Down Expand Up @@ -53,9 +56,9 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo
LOG.atDebug()
.setMessage("Requesting {} blocks {}->{} ({})")
.addArgument(blockHeaders::size)
.addArgument(() -> blockHeaders.get(0).getNumber())
.addArgument(() -> blockHeaders.get(blockHeaders.size() - 1).getNumber())
.addArgument(() -> blockHeaders.get(0).getHash().toHexString())
.addArgument(() -> blockHeaders.getFirst().getNumber())
.addArgument(() -> blockHeaders.getLast().getNumber())
.addArgument(() -> blockHeaders.getFirst().getHash().toHexString())
.log();
return requestBodies(blockHeaders)
.thenApply(this::saveBlocks)
Expand All @@ -76,23 +79,47 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo

@VisibleForTesting
protected CompletableFuture<List<Block>> requestBodies(final List<BlockHeader> blockHeaders) {
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
RetryingGetBlocksFromPeersTask.forHeaders(
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().peerCount(),
blockHeaders);
CompletableFuture<List<Block>> blocksFuture;
if (context.getSynchronizerConfiguration().isPeerTaskSystemEnabled()) {
blocksFuture =
context
.getEthContext()
.getScheduler()
.scheduleServiceTask(
() -> {
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
blockHeaders,
context.getProtocolSchedule(),
context.getEthContext().getEthPeers().peerCount());
PeerTaskExecutorResult<List<Block>> taskResult =
context.getEthContext().getPeerTaskExecutor().execute(task);
if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& taskResult.result().isPresent()) {
return CompletableFuture.completedFuture(taskResult.result().get());
} else {
return CompletableFuture.failedFuture(
new RuntimeException(taskResult.responseCode().toString()));
}
});
} else {
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
RetryingGetBlocksFromPeersTask.forHeaders(
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().peerCount(),
blockHeaders);

final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<Block>>> run =
getBodiesFromPeerTask.run();
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult)
.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
blocksFuture =
getBodiesFromPeerTask.run().thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
return blocksFuture.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final ImportBlocksStep importBlockStep =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand All @@ -35,7 +36,8 @@ public static ChainDownloader create(
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final SyncDurationMetrics syncDurationMetrics) {
final SyncDurationMetrics syncDurationMetrics,
final PeerTaskExecutor peerTaskExecutor) {

final FullSyncTargetManager syncTargetManager =
new FullSyncTargetManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public FullSyncDownloadPipelineFactory(
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.fullSyncTerminationCondition = syncTerminationCondition;
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
this.betterSyncTargetEvaluator =
new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}

@Override
Expand Down Expand Up @@ -105,7 +106,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
Expand Down Expand Up @@ -45,6 +46,7 @@ public FullSyncDownloader(
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final PeerTaskExecutor peerTaskExecutor,
final SyncDurationMetrics syncDurationMetrics) {
this.syncConfig = syncConfig;
this.protocolContext = protocolContext;
Expand All @@ -59,7 +61,8 @@ public FullSyncDownloader(
syncState,
metricsSystem,
terminationCondition,
syncDurationMetrics);
syncDurationMetrics,
peerTaskExecutor);
}

public CompletableFuture<Void> start() {
Expand Down
Loading

0 comments on commit 4ae3be5

Please sign in to comment.