Skip to content

Commit

Permalink
Merge branch 'fix-guess-type' of https://github.com/Gabriel-Trintinal…
Browse files Browse the repository at this point in the history
…ia/besu into fix-guess-type
  • Loading branch information
Gabriel-Trintinalia committed Jan 9, 2025
2 parents 4e76c93 + db07453 commit 1bf2ad8
Show file tree
Hide file tree
Showing 19 changed files with 970 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implements PeerTask for getting block bodies from peers, and matches headers to bodies to supply
* full blocks
*/
public class GetBodiesFromPeerTask implements PeerTask<List<Block>> {

private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class);

private static final int DEFAULT_RETRIES_AGAINST_OTHER_PEERS = 5;

private final List<BlockHeader> blockHeaders;
private final ProtocolSchedule protocolSchedule;
private final int allowedRetriesAgainstOtherPeers;

private final long requiredBlockchainHeight;
private final List<Block> blocks = new ArrayList<>();
private final boolean isPoS;

public GetBodiesFromPeerTask(
final List<BlockHeader> blockHeaders, final ProtocolSchedule protocolSchedule) {
this(blockHeaders, protocolSchedule, DEFAULT_RETRIES_AGAINST_OTHER_PEERS);
}

public GetBodiesFromPeerTask(
final List<BlockHeader> blockHeaders,
final ProtocolSchedule protocolSchedule,
final int allowedRetriesAgainstOtherPeers) {
if (blockHeaders == null || blockHeaders.isEmpty()) {
throw new IllegalArgumentException("Block headers must not be empty");
}

this.blockHeaders = blockHeaders;
this.protocolSchedule = protocolSchedule;
this.allowedRetriesAgainstOtherPeers = allowedRetriesAgainstOtherPeers;

this.requiredBlockchainHeight =
blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
this.isPoS = protocolSchedule.getByBlockHeader(blockHeaders.getLast()).isPoS();
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
return GetBlockBodiesMessage.create(
blockHeaders.stream().map(BlockHeader::getBlockHash).toList());
}

@Override
public List<Block> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
// Blocks returned by this method are in the same order as the headers, but might not be
// complete
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData);
final List<BlockBody> blockBodies = blocksMessage.bodies(protocolSchedule);
if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}

for (int i = 0; i < blockBodies.size(); i++) {
final BlockBody blockBody = blockBodies.get(i);
final BlockHeader blockHeader = blockHeaders.get(i);
if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) {
LOG.atDebug().setMessage("Received block body does not match block header").log();
throw new InvalidPeerTaskResponseException();
}

blocks.add(new Block(blockHeader, blockBody));
}
return blocks;
}

@Override
public int getRetriesWithOtherPeer() {
return allowedRetriesAgainstOtherPeers;
}

private boolean blockBodyMatchesBlockHeader(
final BlockBody blockBody, final BlockHeader blockHeader) {
// this method validates that the block body matches the block header by calculating the roots
// of the block body and comparing them to the roots in the block header
if (!BodyValidation.transactionsRoot(blockBody.getTransactions())
.equals(blockHeader.getTransactionsRoot())) {
return false;
}
if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) {
return false;
}
if (!blockBody
.getWithdrawals()
.map(BodyValidation::withdrawalsRoot)
.equals(blockHeader.getWithdrawalsRoot())) {
return false;
}

return true;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
isPoS || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
}

@Override
public PeerTaskValidationResponse validateResult(final List<Block> result) {
if (result.isEmpty()) {
return PeerTaskValidationResponse.NO_RESULTS_RETURNED;
}
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
}

public List<BlockHeader> getBlockHeaders() {
return blockHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public DefaultSynchronizer(
syncState,
metricsSystem,
terminationCondition,
peerTaskExecutor,
syncDurationMetrics));

if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

Expand All @@ -31,19 +32,38 @@ public class DownloadBodiesStep
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SynchronizerConfiguration synchronizerConfiguration;

public DownloadBodiesStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.synchronizerConfiguration = synchronizerConfiguration;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return ethContext
.getScheduler()
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
} else {
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}

private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
final List<BlockHeader> headers) {

final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor());
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
return CompletableFuture.completedFuture(blocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask;

Expand Down Expand Up @@ -53,9 +56,9 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo
LOG.atDebug()
.setMessage("Requesting {} blocks {}->{} ({})")
.addArgument(blockHeaders::size)
.addArgument(() -> blockHeaders.get(0).getNumber())
.addArgument(() -> blockHeaders.get(blockHeaders.size() - 1).getNumber())
.addArgument(() -> blockHeaders.get(0).getHash().toHexString())
.addArgument(() -> blockHeaders.getFirst().getNumber())
.addArgument(() -> blockHeaders.getLast().getNumber())
.addArgument(() -> blockHeaders.getFirst().getHash().toHexString())
.log();
return requestBodies(blockHeaders)
.thenApply(this::saveBlocks)
Expand All @@ -76,23 +79,47 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo

@VisibleForTesting
protected CompletableFuture<List<Block>> requestBodies(final List<BlockHeader> blockHeaders) {
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
RetryingGetBlocksFromPeersTask.forHeaders(
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().peerCount(),
blockHeaders);
CompletableFuture<List<Block>> blocksFuture;
if (context.getSynchronizerConfiguration().isPeerTaskSystemEnabled()) {
blocksFuture =
context
.getEthContext()
.getScheduler()
.scheduleServiceTask(
() -> {
GetBodiesFromPeerTask task =
new GetBodiesFromPeerTask(
blockHeaders,
context.getProtocolSchedule(),
context.getEthContext().getEthPeers().peerCount());
PeerTaskExecutorResult<List<Block>> taskResult =
context.getEthContext().getPeerTaskExecutor().execute(task);
if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& taskResult.result().isPresent()) {
return CompletableFuture.completedFuture(taskResult.result().get());
} else {
return CompletableFuture.failedFuture(
new RuntimeException(taskResult.responseCode().toString()));
}
});
} else {
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
RetryingGetBlocksFromPeersTask.forHeaders(
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().peerCount(),
blockHeaders);

final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<Block>>> run =
getBodiesFromPeerTask.run();
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult)
.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
blocksFuture =
getBodiesFromPeerTask.run().thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
return blocksFuture.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final ImportBlocksStep importBlockStep =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand All @@ -35,7 +36,8 @@ public static ChainDownloader create(
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final SyncDurationMetrics syncDurationMetrics) {
final SyncDurationMetrics syncDurationMetrics,
final PeerTaskExecutor peerTaskExecutor) {

final FullSyncTargetManager syncTargetManager =
new FullSyncTargetManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public FullSyncDownloadPipelineFactory(
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.fullSyncTerminationCondition = syncTerminationCondition;
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
this.betterSyncTargetEvaluator =
new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}

@Override
Expand Down Expand Up @@ -105,7 +106,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
Expand Down Expand Up @@ -45,6 +46,7 @@ public FullSyncDownloader(
final SyncState syncState,
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition,
final PeerTaskExecutor peerTaskExecutor,
final SyncDurationMetrics syncDurationMetrics) {
this.syncConfig = syncConfig;
this.protocolContext = protocolContext;
Expand All @@ -59,7 +61,8 @@ public FullSyncDownloader(
syncState,
metricsSystem,
terminationCondition,
syncDurationMetrics);
syncDurationMetrics,
peerTaskExecutor);
}

public CompletableFuture<Void> start() {
Expand Down
Loading

0 comments on commit 1bf2ad8

Please sign in to comment.