Skip to content

Commit

Permalink
7311: add GetReceiptsFromPeerTask (hyperledger#7638)
Browse files Browse the repository at this point in the history
Signed-off-by: Matilda Clerke <[email protected]>
  • Loading branch information
Matilda-Clerke authored Oct 30, 2024
1 parent f9f721c commit db29df7
Show file tree
Hide file tree
Showing 28 changed files with 864 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
Expand Down Expand Up @@ -653,6 +655,8 @@ public BesuController build() {
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand Down Expand Up @@ -704,6 +708,7 @@ public BesuController build() {
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);
Expand Down Expand Up @@ -830,6 +835,7 @@ private TrieLogPruner createTrieLogPruner(
* @param worldStateStorageCoordinator the world state storage
* @param protocolContext the protocol context
* @param ethContext the eth context
* @param peerTaskExecutor the PeerTaskExecutor
* @param syncState the sync state
* @param ethProtocolManager the eth protocol manager
* @param pivotBlockSelector the pivot block selector
Expand All @@ -840,6 +846,7 @@ protected DefaultSynchronizer createSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
Expand All @@ -851,6 +858,7 @@ protected DefaultSynchronizer createSynchronizer(
worldStateStorageCoordinator,
ethProtocolManager.getBlockBroadcaster(),
ethContext,
peerTaskExecutor,
syncState,
dataDirectory,
storageProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
Expand Down Expand Up @@ -225,6 +226,7 @@ protected DefaultSynchronizer createSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
Expand All @@ -235,6 +237,7 @@ protected DefaultSynchronizer createSynchronizer(
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);
Expand Down
30 changes: 26 additions & 4 deletions besu/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,48 @@ public void fullSyncFromGenesis() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis());
syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), false);
}

@Test
public void fullSyncFromGenesisUsingPeerTaskSystem() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), true);
}

@Test
public void fastSyncFromGenesis() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FAST, getFastSyncGenesis());
syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), false);
}

@Test
public void fastSyncFromGenesisUsingPeerTaskSystem() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), true);
}

private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig)
private void syncFromGenesis(
final SyncMode mode,
final GenesisConfigFile genesisConfig,
final boolean isPeerTaskSystemEnabled)
throws Exception {
final Path dataDirAhead = Files.createTempDirectory(temp, "db-ahead");
final Path dbAhead = dataDirAhead.resolve("database");
final int blockCount = 500;
final NodeKey aheadDbNodeKey = NodeKeyUtils.createFrom(KeyPairUtil.loadKeyPair(dataDirAhead));
final NodeKey behindDbNodeKey = NodeKeyUtils.generate();
final SynchronizerConfiguration syncConfigAhead =
SynchronizerConfiguration.builder().syncMode(SyncMode.FULL).build();
SynchronizerConfiguration.builder()
.syncMode(SyncMode.FULL)
.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled)
.build();
final ObservableMetricsSystem noOpMetricsSystem = new NoOpMetricsSystem();
final var miningParameters = MiningParameters.newDefault();
final var dataStorageConfiguration = DataStorageConfiguration.DEFAULT_FOREST_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public JsonRpcResponse execute(
private Optional<RpcErrorType> validateMethodAvailability(final JsonRpcRequest request) {
final String name = request.getMethod();

if (LOG.isDebugEnabled()) {
if (LOG.isTraceEnabled()) {
final JsonArray params = JsonObject.mapFrom(request).getJsonArray("params");
LOG.debug("JSON-RPC request -> {} {}", name, params);
LOG.trace("JSON-RPC request -> {} {}", name, params);
}

final JsonRpcMethod method = rpcMethods.get(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ public void processMessage(final Capability cap, final Message message) {
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);

final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void executeServiceTask(final Runnable command) {
servicesExecutor.execute(command);
}

public <T> CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
public CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
return CompletableFuture.runAsync(task, servicesExecutor);
}

Expand All @@ -156,6 +156,19 @@ public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return serviceFuture;
}

public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture = servicesExecutor.submit(() -> propagateResult(future, promise));
// If returned promise is cancelled, cancel the worker future
promise.whenComplete(
(r, t) -> {
if (t instanceof CancellationException) {
workerFuture.cancel(false);
}
});
return promise;
}

public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
pendingFutures.add(pipelineFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public interface PeerTask<T> {
MessageData getRequestMessage();

/**
* Parses the MessageData response from the EthPeer
* Parses and processes the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;
T processResponse(MessageData messageData) throws InvalidPeerTaskResponseException;

/**
* Gets the number of times this task may be attempted against other peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);

result = peerTask.parseResponse(responseMessageData);
result = peerTask.processResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import static java.util.Collections.emptyList;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {

private final Collection<BlockHeader> blockHeaders;
private final ProtocolSchedule protocolSchedule;
private final Map<BlockHeader, List<TransactionReceipt>> receiptsByBlockHeader = new HashMap<>();
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final long requiredBlockchainHeight;

public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final ProtocolSchedule protocolSchedule) {
this.blockHeaders = new ArrayList<>(blockHeaders);
this.protocolSchedule = protocolSchedule;

// pre-fill any headers with an empty receipts root into the result map
this.blockHeaders.stream()
.filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH))
.forEach(header -> receiptsByBlockHeader.put(header, emptyList()));
this.blockHeaders.removeAll(receiptsByBlockHeader.keySet());

// group headers by their receipts root hash to reduce total number of receipts hashes requested
// for
this.blockHeaders.forEach(
header ->
headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header));

// calculate the minimum required blockchain height a peer will need to be able to fulfil this
// request
requiredBlockchainHeight =
this.blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts
// for one of the headers with each unique receipt root.
final List<Hash> blockHashes =
headersByReceiptsRoot.values().stream()
.map(headers -> headers.getFirst().getHash())
.toList();
return GetReceiptsMessage.create(blockHashes);
}

@Override
public Map<BlockHeader, List<TransactionReceipt>> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData);
final List<List<TransactionReceipt>> receiptsByBlock = receiptsMessage.receipts();
// take a copy of the pre-filled receiptsByBlockHeader, to ensure idempotency of subsequent
// calls to processResponse
final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader =
new HashMap<>(receiptsByBlockHeader);
if (!blockHeaders.isEmpty()) {
if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}

for (final List<TransactionReceipt> receiptsInBlock : receiptsByBlock) {
final List<BlockHeader> blockHeaders =
headersByReceiptsRoot.get(BodyValidation.receiptsRoot(receiptsInBlock));
if (blockHeaders == null) {
// Contains receipts that we didn't request, so mustn't be the response we're looking for.
throw new InvalidPeerTaskResponseException();
}
blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock));
}
}
return receiptsByHeader;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& (protocolSchedule.anyMatch((ps) -> ps.spec().isPoS())
|| ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight);
}

@Override
public boolean isSuccess(final Map<BlockHeader, List<TransactionReceipt>> result) {
return !result.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
Expand Down Expand Up @@ -82,6 +83,7 @@ public DefaultSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final BlockBroadcaster blockBroadcaster,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final Path dataDirectory,
final StorageProvider storageProvider,
Expand Down Expand Up @@ -147,6 +149,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand All @@ -163,6 +166,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand All @@ -179,6 +183,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand Down
Loading

0 comments on commit db29df7

Please sign in to comment.