Skip to content

Commit

Permalink
Layered txpool: do not send notifications when moving tx between laye…
Browse files Browse the repository at this point in the history
…rs (hyperledger#7539)


Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored Sep 5, 2024
1 parent b763d96 commit 7f0982d
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 47 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Additions and Improvements

### Bug fixes
- Layered txpool: do not send notifications when moving tx between layers [#7539](https://github.com/hyperledger/besu/pull/7539)

## 24.9.0

Expand All @@ -34,7 +35,7 @@
- Correctly drops messages that exceeds local message size limit [#5455](https://github.com/hyperledger/besu/pull/7507)
- **DebugMetrics**: Fixed a `ClassCastException` occurring in `DebugMetrics` when handling nested metric structures. Previously, `Double` values within these structures were incorrectly cast to `Map` objects, leading to errors. This update allows for proper handling of both direct values and nested structures at the same level. Issue# [#7383](https://github.com/hyperledger/besu/pull/7383)
- `evmtool` was not respecting the `--genesis` setting, resulting in unexpected trace results. [#7433](https://github.com/hyperledger/besu/pull/7433)
- The genesis config override `contractSizeLimit`q was not wired into code size limits [#7557](https://github.com/hyperledger/besu/pull/7557)
- The genesis config override `contractSizeLimit` was not wired into code size limits [#7557](https://github.com/hyperledger/besu/pull/7557)
- Fix incorrect key filtering in LayeredKeyValueStorage stream [#7535](https://github.com/hyperledger/besu/pull/7557)

## 24.8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;

import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
Expand Down Expand Up @@ -68,6 +69,7 @@ public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
"Count of transactions added to the transaction pool",
"source",
"priority",
"reason",
"layer");

removedCounter =
Expand Down Expand Up @@ -215,11 +217,13 @@ public void initExpiredMessagesCounter(final String message) {
SKIPPED_MESSAGES_LOGGING_THRESHOLD));
}

public void incrementAdded(final PendingTransaction pendingTransaction, final String layer) {
public void incrementAdded(
final PendingTransaction pendingTransaction, final AddReason addReason, final String layer) {
addedCounter
.labels(
location(pendingTransaction.isReceivedFromLocalSource()),
priority(pendingTransaction.hasPriority()),
addReason.label(),
layer)
.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.FOLLOW_INVALIDATED;

Expand Down Expand Up @@ -77,7 +78,7 @@ private void pushDown(
senderTxs.remove(txToRemove.getNonce());
processRemove(senderTxs, txToRemove.getTransaction(), FOLLOW_INVALIDATED);
})
.forEach(followingTx -> nextLayer.add(followingTx, gap));
.forEach(followingTx -> nextLayer.add(followingTx, gap, MOVE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;

import org.hyperledger.besu.datatypes.Address;
Expand Down Expand Up @@ -169,7 +169,8 @@ protected abstract TransactionAddedResult canAdd(
final PendingTransaction pendingTransaction, final int gap);

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason addReason) {

// is replacing an existing one?
TransactionAddedResult addStatus = maybeReplaceTransaction(pendingTransaction);
Expand All @@ -178,21 +179,26 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
}

if (addStatus.equals(TRY_NEXT_LAYER)) {
return addToNextLayer(pendingTransaction, gap);
return addToNextLayer(pendingTransaction, gap, addReason);
}

if (addStatus.isSuccess()) {
processAdded(pendingTransaction.detachedCopy());
final var addedPendingTransaction =
addReason.makeCopy() ? pendingTransaction.detachedCopy() : pendingTransaction;
processAdded(addedPendingTransaction, addReason);
addStatus.maybeReplacedTransaction().ifPresent(this::replaced);

nextLayer.notifyAdded(pendingTransaction);
nextLayer.notifyAdded(addedPendingTransaction);

if (!maybeFull()) {
// if there is space try to see if the added tx filled some gaps
tryFillGap(addStatus, pendingTransaction, getRemainingPromotionsPerType());
tryFillGap(addStatus, addedPendingTransaction, getRemainingPromotionsPerType());
}

if (addReason.sendNotification()) {
ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(addedPendingTransaction));
}

ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(pendingTransaction));
} else {
final var rejectReason = addStatus.maybeInvalidReason().orElseThrow();
metrics.incrementRejected(pendingTransaction, rejectReason, name());
Expand Down Expand Up @@ -238,7 +244,7 @@ private void tryFillGap(
pendingTransaction.getNonce(),
remainingPromotionsPerType);
if (promotedTx != null) {
processAdded(promotedTx);
processAdded(promotedTx, AddReason.PROMOTED);
if (!maybeFull()) {
tryFillGap(ADDED, promotedTx, remainingPromotionsPerType);
}
Expand Down Expand Up @@ -286,7 +292,7 @@ public PendingTransaction promoteFor(

if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(senderTxs, candidateTx.getTransaction(), PROMOTED);
processRemove(senderTxs, candidateTx.getTransaction(), RemovalReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());

if (senderTxs.isEmpty()) {
Expand All @@ -302,32 +308,34 @@ public PendingTransaction promoteFor(
}

private TransactionAddedResult addToNextLayer(
final PendingTransaction pendingTransaction, final int distance) {
final PendingTransaction pendingTransaction, final int distance, final AddReason addReason) {
return addToNextLayer(
txsBySender.getOrDefault(pendingTransaction.getSender(), EMPTY_SENDER_TXS),
pendingTransaction,
distance);
distance,
addReason);
}

protected TransactionAddedResult addToNextLayer(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final int distance) {
final int distance,
final AddReason addReason) {
final int nextLayerDistance;
if (senderTxs.isEmpty()) {
nextLayerDistance = distance;
} else {
nextLayerDistance = (int) (pendingTransaction.getNonce() - (senderTxs.lastKey() + 1));
}
return nextLayer.add(pendingTransaction, nextLayerDistance);
return nextLayer.add(pendingTransaction, nextLayerDistance, addReason);
}

private void processAdded(final PendingTransaction addedTx) {
private void processAdded(final PendingTransaction addedTx, final AddReason addReason) {
pendingTransactions.put(addedTx.getHash(), addedTx);
final var senderTxs = txsBySender.computeIfAbsent(addedTx.getSender(), s -> new TreeMap<>());
senderTxs.put(addedTx.getNonce(), addedTx);
increaseCounters(addedTx);
metrics.incrementAdded(addedTx, name());
metrics.incrementAdded(addedTx, addReason, name());
internalAdd(senderTxs, addedTx);
}

Expand All @@ -353,7 +361,7 @@ private void evict(final long spaceToFree, final int txsToEvict) {
++evictedCount;
evictedSize += lastTx.memorySize();
// evicted can always be added to the next layer
addToNextLayer(lessReadySenderTxs, lastTx, 0);
addToNextLayer(lessReadySenderTxs, lastTx, 0, MOVE);
}

if (lessReadySenderTxs.isEmpty()) {
Expand Down Expand Up @@ -459,7 +467,7 @@ final void promoteTransactions() {
nextLayer
.promote(
this::promotionFilter, cacheFreeSpace(), freeSlots, getRemainingPromotionsPerType())
.forEach(this::processAdded);
.forEach(addedTx -> processAdded(addedTx, AddReason.PROMOTED));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE;

import org.hyperledger.besu.datatypes.Wei;
Expand Down Expand Up @@ -133,7 +134,7 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket
.addArgument(newNextBlockBaseFee::toHumanReadableString)
.log();
processEvict(senderTxs, demoteTx, BELOW_BASE_FEE);
addToNextLayer(senderTxs, demoteTx, 0);
addToNextLayer(senderTxs, demoteTx, 0, MOVE);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public List<PendingTransaction> getAll() {
}

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;

Expand Down Expand Up @@ -100,7 +101,7 @@ public synchronized TransactionAddedResult addTransaction(
}

try {
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance);
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance, NEW);
} catch (final Throwable throwable) {
return reconcileAndRetryAdd(
pendingTransaction, stateSenderNonce, (int) nonceDistance, throwable);
Expand All @@ -123,7 +124,7 @@ private TransactionAddedResult reconcileAndRetryAdd(
.log();
reconcileSender(pendingTransaction.getSender(), stateSenderNonce);
try {
return prioritizedTransactions.add(pendingTransaction, nonceDistance);
return prioritizedTransactions.add(pendingTransaction, nonceDistance, NEW);
} catch (final Throwable throwable2) {
// the error should have been solved by the reconcile, logging at higher level now
LOG.atWarn()
Expand Down Expand Up @@ -210,7 +211,7 @@ private void reconcileSender(final Address sender, final long stateSenderNonce)
final long lowestNonce = reAddTxs.getFirst().getNonce();
final int newNonceDistance = (int) Math.max(0, lowestNonce - stateSenderNonce);

reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance));
reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance, NEW));
}

LOG.atDebug()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ public interface TransactionsLayer {

boolean contains(Transaction transaction);

TransactionAddedResult add(PendingTransaction pendingTransaction, int gap);
/**
* Try to add a pending transaction to this layer. The {@code addReason} is used to discriminate
* between a new tx that is added to the pool, or a tx that is already in the pool, but is moving
* internally between layers, for example, due to a promotion or demotion. The distinction is
* needed since we only need to send a notification for a new tx, and not when it is only an
* internal move.
*
* @param pendingTransaction the tx to try to add to this layer
* @param gap the nonce gap between the current sender nonce and the tx
* @param addReason define if it is a new tx or an internal move
* @return the result of the add operation
*/
TransactionAddedResult add(PendingTransaction pendingTransaction, int gap, AddReason addReason);

void remove(PendingTransaction pendingTransaction, RemovalReason reason);

Expand Down Expand Up @@ -108,6 +120,49 @@ List<PendingTransaction> promote(

String logSender(Address sender);

/** Describe why we are trying to add a tx to a layer. */
enum AddReason {
/** When adding a tx, that is not present in the pool. */
NEW(true, true),
/** When adding a tx as result of an internal move between layers. */
MOVE(false, false),
/** When adding a tx as result of a promotion from a lower layer. */
PROMOTED(false, false);

private final boolean sendNotification;
private final boolean makeCopy;
private final String label;

AddReason(final boolean sendNotification, final boolean makeCopy) {
this.sendNotification = sendNotification;
this.makeCopy = makeCopy;
this.label = name().toLowerCase(Locale.ROOT);
}

/**
* Should we send add notification for this reason?
*
* @return true if notification should be sent
*/
public boolean sendNotification() {
return sendNotification;
}

/**
* Should the layer make a copy of the pending tx before adding it, to avoid keeping reference
* to potentially large underlying byte buffers?
*
* @return true is a copy is necessary
*/
public boolean makeCopy() {
return makeCopy;
}

public String label() {
return label;
}
}

enum RemovalReason {
CONFIRMED,
CROSS_LAYER_REPLACED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;

import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
Expand Down Expand Up @@ -169,7 +170,7 @@ protected void shouldPrioritizeValueThenTimeAddedToPool(
.mapToObj(
i -> {
final var lowPriceTx = lowValueTxSupplier.next();
final var prioritizeResult = transactions.add(lowPriceTx, 0);
final var prioritizeResult = transactions.add(lowPriceTx, 0, NEW);

assertThat(prioritizeResult).isEqualTo(ADDED);
assertThat(evictCollector.getEvictedTransactions()).isEmpty();
Expand All @@ -180,7 +181,7 @@ protected void shouldPrioritizeValueThenTimeAddedToPool(
assertThat(transactions.count()).isEqualTo(MAX_TRANSACTIONS);

// This should kick the oldest tx with the low gas price out, namely the first one we added
final var highValuePrioRes = transactions.add(highValueTx, 0);
final var highValuePrioRes = transactions.add(highValueTx, 0, NEW);
assertThat(highValuePrioRes).isEqualTo(ADDED);
assertEvicted(expectedDroppedTx);

Expand All @@ -195,7 +196,7 @@ protected TransactionAddedResult prioritizeTransaction(final Transaction tx) {
}

protected TransactionAddedResult prioritizeTransaction(final PendingTransaction tx) {
return transactions.add(tx, 0);
return transactions.add(tx, 0, NEW);
}

protected void assertTransactionPrioritized(final PendingTransaction tx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
Expand Down Expand Up @@ -258,9 +259,10 @@ protected void addLocalTransactions(
}
}

protected long getAddedCount(final String source, final String priority, final String layer) {
protected long getAddedCount(
final String source, final String priority, final AddReason addReason, final String layer) {
return metricsSystem.getCounterValue(
TransactionPoolMetrics.ADDED_COUNTER_NAME, source, priority, layer);
TransactionPoolMetrics.ADDED_COUNTER_NAME, source, priority, addReason.label(), layer);
}

protected long getRemovedCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public String name() {
}

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
final var res = super.add(pendingTransaction, gap);
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason addReason) {
final var res = super.add(pendingTransaction, gap, addReason);
evictedTxs.add(pendingTransaction);
return res;
}
Expand Down
Loading

0 comments on commit 7f0982d

Please sign in to comment.