Skip to content

Commit

Permalink
Fine tune already seen txs tracker when a tx is removed from the pool (
Browse files Browse the repository at this point in the history
…hyperledger#7755)

Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored Oct 17, 2024
1 parent 5469b52 commit dfbfb96
Show file tree
Hide file tree
Showing 25 changed files with 284 additions and 98 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Upcoming Breaking Changes

### Additions and Improvements
- Fine tune already seen txs tracker when a tx is removed from the pool [#7755](https://github.com/hyperledger/besu/pull/7755)

### Bug fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void removeTransactionAddedListener(final long listenerIdentifier) {
public long addTransactionDroppedListener(
final TransactionDroppedListener transactionDroppedListener) {
return transactionPool.subscribeDroppedTransactions(
transactionDroppedListener::onTransactionDropped);
(transaction, reason) -> transactionDroppedListener.onTransactionDropped(transaction));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;

import java.util.List;

Expand All @@ -34,7 +35,7 @@ public PendingTransactionDroppedSubscriptionService(
}

@Override
public void onTransactionDropped(final Transaction transaction) {
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
notifySubscribers(transaction.getHash());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -47,6 +48,18 @@ public class PendingTransactionDroppedSubscriptionServiceTest {

private static final Hash TX_ONE =
Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7");
private static final RemovalReason DUMMY_REMOVAL_REASON =
new RemovalReason() {
@Override
public String label() {
return "";
}

@Override
public boolean stopTracking() {
return false;
}
};

@Mock private SubscriptionManager subscriptionManager;
@Mock private Blockchain blockchain;
Expand All @@ -65,7 +78,7 @@ public void onTransactionAddedMustSendMessage() {
setUpSubscriptions(subscriptionIds);
final Transaction pending = transaction(TX_ONE);

service.onTransactionDropped(pending);
service.onTransactionDropped(pending, DUMMY_REMOVAL_REASON);

verifyNoInteractions(block);
verifyNoInteractions(blockchain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
public class PeerTransactionTracker
implements EthPeer.DisconnectCallback, PendingTransactionDroppedListener {
private static final Logger LOG = LoggerFactory.getLogger(PeerTransactionTracker.class);

private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
Expand Down Expand Up @@ -122,13 +123,14 @@ boolean hasPeerSeenTransaction(final EthPeer peer, final Hash txHash) {
}

private <T> Set<T> createTransactionsSet() {
return Collections.newSetFromMap(
new LinkedHashMap<>(1 << 4, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
});
return Collections.synchronizedSet(
Collections.newSetFromMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
}));
}

@Override
Expand Down Expand Up @@ -175,4 +177,11 @@ public void onDisconnect(final EthPeer peer) {
private String logPeerSet(final Set<EthPeer> peers) {
return peers.stream().map(EthPeer::getLoggableId).collect(Collectors.joining(","));
}

@Override
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
if (reason.stopTracking()) {
seenTransactions.values().stream().forEach(st -> st.remove(transaction.getHash()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
@FunctionalInterface
public interface PendingTransactionDroppedListener {

void onTransactionDropped(Transaction transaction);
void onTransactionDropped(Transaction transaction, final RemovalReason reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;

/** The reason why a pending tx has been removed */
public interface RemovalReason {
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
String label();

/**
* Return true if we should stop tracking the tx as already seen
*
* @return true if no more tracking is needed
*/
boolean stopTracking();
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public TransactionPool(
initializeBlobMetrics();
initLogForReplay();
subscribePendingTransactions(this::mapBlobsOnTransactionAdded);
subscribeDroppedTransactions(this::unmapBlobsOnTransactionDropped);
subscribeDroppedTransactions(
(transaction, reason) -> unmapBlobsOnTransactionDropped(transaction));
}

private void initLogForReplay() {
Expand Down Expand Up @@ -720,16 +721,18 @@ class PendingTransactionsListenersProxy {

void subscribe() {
onAddedListenerId = pendingTransactions.subscribePendingTransactions(this::onAdded);
onDroppedListenerId = pendingTransactions.subscribeDroppedTransactions(this::onDropped);
onDroppedListenerId =
pendingTransactions.subscribeDroppedTransactions(
(transaction, reason) -> onDropped(transaction, reason));
}

void unsubscribe() {
pendingTransactions.unsubscribePendingTransactions(onAddedListenerId);
pendingTransactions.unsubscribeDroppedTransactions(onDroppedListenerId);
}

private void onDropped(final Transaction transaction) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
private void onDropped(final Transaction transaction, final RemovalReason reason) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction, reason));
}

private void onAdded(final Transaction transaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected PendingTransaction getEvictable() {
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
orderByFee.remove(removedTx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;

import java.util.Map;
import java.util.NavigableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.RemovedFrom.POOL;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down Expand Up @@ -295,7 +295,9 @@ public PendingTransaction promoteFor(
if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(
senderTxs, candidateTx.getTransaction(), RemovalReason.LayerMoveReason.PROMOTED);
senderTxs,
candidateTx.getTransaction(),
LayeredRemovalReason.LayerMoveReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());

if (senderTxs.isEmpty()) {
Expand Down Expand Up @@ -386,7 +388,7 @@ protected void replaced(final PendingTransaction replacedTx) {
decreaseCounters(replacedTx);
metrics.incrementRemoved(replacedTx, REPLACED.label(), name());
internalReplaced(replacedTx);
notifyTransactionDropped(replacedTx);
notifyTransactionDropped(replacedTx, REPLACED);
}

protected abstract void internalReplaced(final PendingTransaction replacedTx);
Expand Down Expand Up @@ -415,15 +417,15 @@ private TransactionAddedResult maybeReplaceTransaction(final PendingTransaction
protected PendingTransaction processRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final Transaction transaction,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash());

if (removedTx != null) {
decreaseCounters(removedTx);
metrics.incrementRemoved(removedTx, removalReason.label(), name());
internalRemove(senderTxs, removedTx, removalReason);
if (removalReason.removedFrom().equals(POOL)) {
notifyTransactionDropped(removedTx);
notifyTransactionDropped(removedTx, removalReason);
}
}
return removedTx;
Expand All @@ -432,7 +434,7 @@ protected PendingTransaction processRemove(
protected PendingTransaction processEvict(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction evictedTx,
final RemovalReason reason) {
final LayeredRemovalReason reason) {
final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash());
if (removedTx != null) {
decreaseCounters(evictedTx);
Expand Down Expand Up @@ -545,7 +547,7 @@ protected abstract void internalConfirmed(
protected abstract void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final RemovalReason removalReason);
final LayeredRemovalReason removalReason);

protected abstract PendingTransaction getEvictable();

Expand Down Expand Up @@ -606,9 +608,10 @@ protected void notifyTransactionAdded(final PendingTransaction pendingTransactio
listener -> listener.onTransactionAdded(pendingTransaction.getTransaction()));
}

protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.DEMOTED;

import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.DROPPED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand All @@ -25,7 +25,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;

Expand Down Expand Up @@ -78,7 +78,7 @@ public List<PendingTransaction> getAll() {
@Override
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
notifyTransactionDropped(pendingTransaction, DROPPED);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
return TransactionAddedResult.DROPPED;
Expand Down Expand Up @@ -152,9 +152,10 @@ public void unsubscribeFromDropped(final long id) {
onDroppedListeners.unsubscribe(id);
}

protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.RECONCILED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down
Loading

0 comments on commit dfbfb96

Please sign in to comment.