Skip to content

Commit

Permalink
8053: Add GetPooledTransactionsFromPeerTask and appropriate usage
Browse files Browse the repository at this point in the history
Signed-off-by: Matilda Clerke <[email protected]>
  • Loading branch information
Matilda-Clerke committed Dec 20, 2024
1 parent 320c476 commit 71a9abd
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ public BesuController build() {
syncState,
transactionPoolConfiguration,
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningConfiguration);
miningConfiguration,
syncConfig.isPeerTaskSystemEnabled());

final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public void setUp() {
syncState,
txPoolConfig,
new BlobCache(),
MiningConfiguration.newDefault());
MiningConfiguration.newDefault(),
false);

serviceImpl =
new BesuEventsImpl(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.List;
import java.util.function.Predicate;

public class GetPooledTransactionsFromPeerTask implements PeerTask<List<Transaction>> {

private final List<Hash> hashes;

public GetPooledTransactionsFromPeerTask(final List<Hash> hashes) {
this.hashes = hashes.stream().distinct().toList();
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
return GetPooledTransactionsMessage.create(hashes);
}

@Override
public List<Transaction> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
final PooledTransactionsMessage pooledTransactionsMessage =
PooledTransactionsMessage.readFrom(messageData);
final List<Transaction> responseTransactions = pooledTransactionsMessage.transactions();
if (responseTransactions.size() != hashes.size()) {
throw new InvalidPeerTaskResponseException(
"Response transaction count does not match request hash count");
}
return responseTransactions;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (peer) -> true;
}

@Override
public PeerTaskValidationResponse validateResult(final List<Transaction> result) {
if (!result.stream().allMatch((t) -> hashes.contains(t.getHash()))) {
return PeerTaskValidationResponse.RESULTS_DO_NOT_MATCH_QUERY;
}
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
}

public List<Hash> getHashes() {
return hashes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
Expand All @@ -28,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import com.google.common.collect.EvictingQueue;
Expand All @@ -49,6 +52,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private final ScheduledFuture<?> scheduledFuture;
private final EthPeer peer;
private final Queue<Hash> txAnnounces;
private final boolean isPeerTaskSystemEnabled;

public BufferedGetPooledTransactionsFromPeerFetcher(
final EthContext ethContext,
Expand All @@ -57,7 +61,8 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
final TransactionPoolMetrics metrics,
final String metricLabel) {
final String metricLabel,
final boolean isPeerTaskSystemEnabled) {
this.ethContext = ethContext;
this.scheduledFuture = scheduledFuture;
this.peer = peer;
Expand All @@ -67,6 +72,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
this.metricLabel = metricLabel;
this.txAnnounces =
Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS));
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
}

public ScheduledFuture<?> getScheduledFuture() {
Expand All @@ -76,27 +82,53 @@ public ScheduledFuture<?> getScheduledFuture() {
public void requestTransactions() {
List<Hash> txHashesAnnounced;
while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) {
final GetPooledTransactionsFromPeerTask task =
GetPooledTransactionsFromPeerTask.forHashes(
ethContext, txHashesAnnounced, metrics.getMetricsSystem());
task.assignPeer(peer);
ethContext
.getScheduler()
.scheduleSyncWorkerTask(task)
.thenAccept(
result -> {
List<Transaction> retrievedTransactions = result.getResult();
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);

LOG.atTrace()
.setMessage("Got {} transactions of {} hashes requested from peer {}")
.addArgument(retrievedTransactions::size)
.addArgument(task.getTransactionHashes()::size)
.addArgument(peer::getLoggableId)
.log();

transactionPool.addRemoteTransactions(retrievedTransactions);
});
CompletableFuture<List<Transaction>> futureTransactions;
if (isPeerTaskSystemEnabled) {
final org.hyperledger.besu.ethereum.eth.manager.peertask.task
.GetPooledTransactionsFromPeerTask
task =
new org.hyperledger.besu.ethereum.eth.manager.peertask.task
.GetPooledTransactionsFromPeerTask(txHashesAnnounced);
futureTransactions =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> {
PeerTaskExecutorResult<List<Transaction>> taskResult =
ethContext.getPeerTaskExecutor().executeAgainstPeer(task, peer);
if (taskResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
|| taskResult.result().isEmpty()) {
return CompletableFuture.failedFuture(
new RuntimeException("Failed to retrieve transactions for hashes"));
}
return CompletableFuture.completedFuture(taskResult.result().get());
});
} else {
final GetPooledTransactionsFromPeerTask task =
GetPooledTransactionsFromPeerTask.forHashes(
ethContext, txHashesAnnounced, metrics.getMetricsSystem());
task.assignPeer(peer);
futureTransactions =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(task)
.thenCompose(
(peerTaskResult) ->
CompletableFuture.completedFuture(peerTaskResult.getResult()));
}

futureTransactions.thenAccept(
retrievedTransactions -> {
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);

LOG.atTrace()
.setMessage("Got {} transactions requested from peer {}")
.addArgument(retrievedTransactions::size)
.addArgument(peer::getLoggableId)
.log();

transactionPool.addRemoteTransactions(retrievedTransactions);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
private final TransactionPoolMetrics metrics;
private final boolean isPeerTaskSystemEnabled;

public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext,
final TransactionPoolMetrics metrics) {
final TransactionPoolMetrics metrics,
final boolean isPeerTaskSystemEnabled) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration;
this.ethContext = ethContext;
this.metrics = metrics;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
this.scheduledTasks = new ConcurrentHashMap<>();
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
}

void processNewPooledTransactionHashesMessage(
Expand Down Expand Up @@ -114,7 +117,8 @@ private void processNewPooledTransactionHashesMessage(
transactionPool,
transactionTracker,
metrics,
METRIC_LABEL);
METRIC_LABEL,
isPeerTaskSystemEnabled);
});

bufferedTask.addHashes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static TransactionPool createTransactionPool(
final SyncState syncState,
final TransactionPoolConfiguration transactionPoolConfiguration,
final BlobCache blobCache,
final MiningConfiguration miningConfiguration) {
final MiningConfiguration miningConfiguration,
final boolean isPeerTaskSystemEnabled) {

final TransactionPoolMetrics metrics = new TransactionPoolMetrics(metricsSystem);

Expand All @@ -80,7 +81,8 @@ public static TransactionPool createTransactionPool(
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
blobCache,
miningConfiguration);
miningConfiguration,
isPeerTaskSystemEnabled);
}

static TransactionPool createTransactionPool(
Expand All @@ -95,7 +97,8 @@ static TransactionPool createTransactionPool(
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final BlobCache blobCache,
final MiningConfiguration miningConfiguration) {
final MiningConfiguration miningConfiguration,
final boolean isPeerTaskSystemEnabled) {

final TransactionPool transactionPool =
new TransactionPool(
Expand Down Expand Up @@ -135,7 +138,8 @@ static TransactionPool createTransactionPool(
transactionPool,
transactionPoolConfiguration,
ethContext,
metrics),
metrics,
isPeerTaskSystemEnabled),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());

subscribeTransactionHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,8 @@ public void transactionMessagesGoToTheCorrectExecutor() {
new SyncState(blockchain, ethManager.ethContext().getEthPeers()),
TransactionPoolConfiguration.DEFAULT,
new BlobCache(),
MiningConfiguration.newDefault())
MiningConfiguration.newDefault(),
false)
.setEnabled();

// Send just a transaction message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void setupTest() {
syncState,
TransactionPoolConfiguration.DEFAULT,
new BlobCache(),
MiningConfiguration.newDefault());
MiningConfiguration.newDefault(),
false);
transactionPool.setEnabled();

ethProtocolManager =
Expand Down
Loading

0 comments on commit 71a9abd

Please sign in to comment.