diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 6acdcf408..aff14a1c7 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -665,19 +665,11 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck txInfo.SenderID: {}, } - replaced, shouldDrop := txmp.priorityIndex.TryReplacement(wtx) - if shouldDrop { + if txmp.isInMempool(wtx.tx) { return nil } - txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) - txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) - txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) - - if replaced != nil { - txmp.removeTx(replaced, true, false, false) - } - if txmp.insertTx(wtx, replaced == nil) { + if txmp.insertTx(wtx) { txmp.logger.Debug( "inserted good transaction", "priority", wtx.priority, @@ -871,15 +863,20 @@ func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error { return nil } -func (txmp *TxMempool) insertTx(wtx *WrappedTx, updatePriorityIndex bool) bool { - if txmp.isInMempool(wtx.tx) { +func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { + replacedTx, inserted := txmp.priorityIndex.PushTx(wtx) + if !inserted { return false } + txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) + txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) + txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) - txmp.txStore.SetTx(wtx) - if updatePriorityIndex { - txmp.priorityIndex.PushTx(wtx) + if replacedTx != nil { + txmp.removeTx(replacedTx, true, false, false) } + + txmp.txStore.SetTx(wtx) txmp.heightIndex.Insert(wtx) txmp.timestampIndex.Insert(wtx) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 6b420bb67..ee9bc4873 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -71,12 +71,10 @@ func (pq *TxPriorityQueue) getTxWithSameNonceUnsafe(tx *WrappedTx) (*WrappedTx, return nil, -1 } -func (pq *TxPriorityQueue) TryReplacement(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) { +func (pq *TxPriorityQueue) tryReplacementUnsafe(tx *WrappedTx) (replaced *WrappedTx, shouldDrop bool) { if !tx.isEVM { return nil, false } - pq.mtx.Lock() - defer pq.mtx.Unlock() queue, ok := pq.evmQueue[tx.evmAddress] if ok && len(queue) > 0 { existing, idx := pq.getTxWithSameNonceUnsafe(tx) @@ -338,11 +336,25 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { //} // PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { +func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) (*WrappedTx, bool) { pq.mtx.Lock() defer pq.mtx.Unlock() + replacedTx, shouldDrop := pq.tryReplacementUnsafe(tx) + + // tx was not inserted, and nothing was replaced + if shouldDrop { + return nil, false + } + + // tx replaced an existing transaction + if replacedTx != nil { + return replacedTx, true + } + + // tx was not inserted yet, so insert it pq.pushTxUnsafe(tx) + return nil, true } func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index a319e9888..0c28b4fa3 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -70,7 +70,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { {sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9}, {sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7}, }, - expectedOutput: []int64{1, 2, 3}, + expectedOutput: []int64{1, 3}, }, { name: "PriorityWithEVMAndNonEVMDuplicateNonce", @@ -380,17 +380,23 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) { expectedQueue []*WrappedTx expectedHeap []*WrappedTx }{ - {&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}}, - {&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{}, []*WrappedTx{}}, + // non-evm transaction is inserted into empty queue + {&WrappedTx{isEVM: false}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: false}}, []*WrappedTx{{isEVM: false}}}, + // evm transaction is inserted into empty queue + {&WrappedTx{isEVM: true, evmAddress: "addr1"}, []*WrappedTx{}, false, false, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}, []*WrappedTx{{isEVM: true, evmAddress: "addr1"}}}, + // evm transaction (new nonce) is inserted into queue with existing tx (lower nonce) { &WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, []*WrappedTx{ {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, }, false, false, []*WrappedTx{ {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, + {isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, }, []*WrappedTx{ {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, + {isEVM: true, evmAddress: "addr1", evmNonce: 1, priority: 100, tx: []byte("abc")}, }, }, + // evm transaction (new nonce) is not inserted because it's a duplicate nonce and same priority { &WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("abc")}, []*WrappedTx{ {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, @@ -400,6 +406,7 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) { {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, }, }, + // evm transaction (new nonce) replaces the existing nonce transaction because its priority is higher { &WrappedTx{isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 101, tx: []byte("abc")}, []*WrappedTx{ {isEVM: true, evmAddress: "addr1", evmNonce: 0, priority: 100, tx: []byte("def")}, @@ -425,13 +432,13 @@ func TestTxPriorityQueue_TryReplacement(t *testing.T) { for _, e := range test.existing { pq.PushTx(e) } - replaced, dropped := pq.TryReplacement(test.tx) + replaced, inserted := pq.PushTx(test.tx) if test.expectedReplaced { require.NotNil(t, replaced) } else { require.Nil(t, replaced) } - require.Equal(t, test.expectedDropped, dropped) + require.Equal(t, test.expectedDropped, !inserted) for i, q := range pq.evmQueue[test.tx.evmAddress] { require.Equal(t, test.expectedQueue[i].tx.Key(), q.tx.Key()) require.Equal(t, test.expectedQueue[i].priority, q.priority) diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 342910d56..9ab4fbc70 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -167,7 +167,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { secondaryReactor.observePanic = observePanic firstTx := &WrappedTx{} - primaryMempool.insertTx(firstTx, true) + primaryMempool.insertTx(firstTx) // run the router rts.start(ctx, t) @@ -180,7 +180,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - primaryMempool.insertTx(next, true) + primaryMempool.insertTx(next) }() }