Skip to content

Commit

Permalink
Improve action pool (#4030)
Browse files Browse the repository at this point in the history
* fix Dockerfile

* [api] support "input" field in web3 request params (#3971)

* support input/data field in web3 request

* add test

---------

Co-authored-by: CoderZhi <[email protected]>

* set gprc MaxConnectionIdle to 5 min (#4023)

* draft

* [actpool] only expire tx with nonce > pendingNonce

* implement replacing logic

* fix accountpool update

* unittest

* delete account

* place empty queue at the end

* add unittest

* address comments

* Update actpool.go

* address comments

* Rebase queueworker.go

---------

Co-authored-by: Dustin Xie <[email protected]>
Co-authored-by: Chen Chen <[email protected]>
Co-authored-by: envestcc <[email protected]>
  • Loading branch information
4 people authored Jan 16, 2024
1 parent 943660b commit 8313aeb
Show file tree
Hide file tree
Showing 9 changed files with 628 additions and 172 deletions.
164 changes: 164 additions & 0 deletions actpool/accountpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) 2019 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package actpool

import (
"container/heap"
"math/big"
"time"

"github.com/iotexproject/iotex-core/action"
)

type (
accountItem struct {
index int
actQueue ActQueue
}

accountPriorityQueue []*accountItem

accountPool struct {
accounts map[string]*accountItem
priorityQueue accountPriorityQueue
}
)

func newAccountPool() *accountPool {
ap := &accountPool{
priorityQueue: accountPriorityQueue{},
accounts: map[string]*accountItem{},
}
heap.Init(&ap.priorityQueue)

return ap
}

func (ap *accountPool) Account(addr string) ActQueue {
if account, ok := ap.accounts[addr]; ok {
return account.actQueue
}
return nil
}

func (ap *accountPool) PopAccount(addr string) ActQueue {
if account, ok := ap.accounts[addr]; ok {
heap.Remove(&ap.priorityQueue, account.index)
delete(ap.accounts, addr)
return account.actQueue
}

return nil
}

func (ap *accountPool) PutAction(
addr string,
actpool *actPool,
pendingNonce uint64,
confirmedBalance *big.Int,
expiry time.Duration,
act action.SealedEnvelope,
) error {
account, ok := ap.accounts[addr]
if !ok {
queue := NewActQueue(
actpool,
addr,
pendingNonce,
confirmedBalance,
WithTimeOut(expiry),
)
if err := queue.Put(act); err != nil {
return err
}
ap.accounts[addr] = &accountItem{
index: len(ap.accounts),
actQueue: queue,
}
heap.Push(&ap.priorityQueue, ap.accounts[addr])
return nil
}

if err := account.actQueue.Put(act); err != nil {
return err
}
heap.Fix(&ap.priorityQueue, account.index)

return nil
}

func (ap *accountPool) PopPeek() *action.SealedEnvelope {
if len(ap.accounts) == 0 {
return nil
}
act := ap.priorityQueue[0].actQueue.PopActionWithLargestNonce()
heap.Fix(&ap.priorityQueue, 0)

return act
}

func (ap *accountPool) Range(callback func(addr string, acct ActQueue)) {
for addr, account := range ap.accounts {
callback(addr, account.actQueue)
}
heap.Init(&ap.priorityQueue)
}

func (ap *accountPool) DeleteIfEmpty(addr string) {
account, ok := ap.accounts[addr]
if !ok {
return
}
if account.actQueue.Empty() {
heap.Remove(&ap.priorityQueue, account.index)
delete(ap.accounts, addr)
}
}

func (aq accountPriorityQueue) Len() int { return len(aq) }
func (aq accountPriorityQueue) Less(i, j int) bool {
is, igp := aq[i].actQueue.NextAction()
js, jgp := aq[j].actQueue.NextAction()
if jgp == nil {
return true
}
if igp == nil {
return false
}
if !is && js {
return true
}
if !js && is {
return false
}

return igp.Cmp(jgp) < 0
}

func (aq accountPriorityQueue) Swap(i, j int) {
aq[i], aq[j] = aq[j], aq[i]
aq[i].index = i
aq[j].index = j
}

func (aq *accountPriorityQueue) Push(x interface{}) {
if in, ok := x.(*accountItem); ok {
in.index = len(*aq)
*aq = append(*aq, in)
}
}

func (aq *accountPriorityQueue) Pop() interface{} {
old := *aq
n := len(old)
if n == 0 {
return nil
}
x := old[n-1]
old[n-1] = nil // avoid memory leak
*aq = old[0 : n-1]
return x
}
194 changes: 194 additions & 0 deletions actpool/accountpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package actpool

import (
"math/big"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/action"
)

var (
_balance = big.NewInt(0).SetBytes([]byte("100000000000000000000000"))
_expireTime = time.Hour
)

func TestAccountPool_PopPeek(t *testing.T) {
r := require.New(t)
t.Run("empty pool", func(t *testing.T) {
ap := newAccountPool()
r.Nil(ap.PopPeek())
})
t.Run("one action", func(t *testing.T) {
ap := newAccountPool()
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 0, _balance, _expireTime, tsf1))
r.Equal(&tsf1, ap.PopPeek())
r.Equal(0, ap.Account(_addr1).Len())
})
t.Run("multiple actions in one account", func(t *testing.T) {
ap := newAccountPool()
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf2, err := action.SignedTransfer(_addr1, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf2))
r.Equal(&tsf2, ap.PopPeek())
r.Equal(&tsf1, ap.PopPeek())
r.Nil(ap.PopPeek())
r.Equal(0, ap.Account(_addr1).Len())
})
t.Run("peek with pending nonce", func(t *testing.T) {
ap := newAccountPool()
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey2, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf2))
r.Equal(&tsf2, ap.PopPeek())
t.Run("even if with higher price", func(t *testing.T) {
tsf2, err := action.SignedTransfer(_addr2, _priKey2, 2, big.NewInt(100), nil, uint64(0), big.NewInt(2))
r.NoError(err)
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf2))
r.Equal(&tsf2, ap.PopPeek())
})
})
t.Run("peek with lower gas price", func(t *testing.T) {
ap := newAccountPool()
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(2))
r.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey2, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf2))
r.Equal(&tsf2, ap.PopPeek())
r.Equal(&tsf1, ap.PopPeek())
t.Run("peek with pending nonce even if has higher price ", func(t *testing.T) {
tsf1, err = action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(2))
r.NoError(err)
tsf2, err = action.SignedTransfer(_addr1, _priKey2, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf2))
r.Equal(&tsf2, ap.PopPeek())
r.Equal(&tsf1, ap.PopPeek())
})
})
t.Run("multiple actions in multiple accounts", func(t *testing.T) {
ap := newAccountPool()
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(2))
r.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf3, err := action.SignedTransfer(_addr2, _priKey2, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf4, err := action.SignedTransfer(_addr2, _priKey2, 3, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf5, err := action.SignedTransfer(_addr2, _priKey3, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf6, err := action.SignedTransfer(_addr2, _priKey3, 3, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf7, err := action.SignedTransfer(_addr2, _priKey4, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf8, err := action.SignedTransfer(_addr2, _priKey4, 3, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf2))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf3))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf4))
r.NoError(ap.PutAction(_addr3, nil, 1, _balance, _expireTime, tsf5))
r.NoError(ap.PutAction(_addr3, nil, 1, _balance, _expireTime, tsf6))
r.NoError(ap.PutAction(_addr4, nil, 1, _balance, _expireTime, tsf7))
r.NoError(ap.PutAction(_addr4, nil, 1, _balance, _expireTime, tsf8))
r.Equal(&tsf4, ap.PopPeek())
r.Equal(&tsf3, ap.PopPeek())
r.Equal(&tsf8, ap.PopPeek())
r.Equal(&tsf7, ap.PopPeek())
r.Equal(&tsf6, ap.PopPeek())
r.Equal(&tsf5, ap.PopPeek())
r.Equal(&tsf2, ap.PopPeek())
r.Equal(&tsf1, ap.PopPeek())
r.Nil(ap.PopPeek())
})
}

func TestAccountPool_PopAccount(t *testing.T) {
r := require.New(t)
ap := newAccountPool()

// Create a sample account
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))

// Test when the account exists
result := ap.PopAccount(_addr1)
r.Equal(1, result.Len())
r.Equal(tsf1, result.AllActs()[0])
r.Nil(ap.Account(_addr1))

// Test when the account does not exist
result = ap.PopAccount("nonExistentAddress")
r.Nil(result)
}

func TestAccountPool_Range(t *testing.T) {
r := require.New(t)
ap := newAccountPool()

// Create a sample account
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf2, err := action.SignedTransfer(_addr1, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf3, err := action.SignedTransfer(_addr1, _priKey2, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
tsf4, err := action.SignedTransfer(_addr1, _priKey2, 1, big.NewInt(100), nil, uint64(0), big.NewInt(2))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf2))
r.NoError(ap.PutAction(_addr2, nil, 1, _balance, _expireTime, tsf3))
r.Equal(2, ap.Account(_addr1).Len())
r.Equal(1, ap.Account(_addr2).Len())
// Define a callback function
callback := func(addr string, acct ActQueue) {
if addr == _addr2 {
r.NoError(acct.Put(tsf4))
} else if addr == _addr1 {
acct.PopActionWithLargestNonce()
}
}
// Call the Range method
ap.Range(callback)
// Verify the results
r.Equal(1, ap.Account(_addr1).Len())
r.Equal(2, ap.Account(_addr2).Len())
r.Equal(&tsf1, ap.PopPeek())
r.Equal(&tsf3, ap.PopPeek())
r.Equal(&tsf4, ap.PopPeek())
r.Nil(ap.PopPeek())
}

func TestAccountPool_DeleteIfEmpty(t *testing.T) {
r := require.New(t)
ap := newAccountPool()

// Create a sample account
tsf1, err := action.SignedTransfer(_addr1, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(1))
r.NoError(err)
r.NoError(ap.PutAction(_addr1, nil, 1, _balance, _expireTime, tsf1))

// Test when the account is not empty
ap.DeleteIfEmpty(_addr1)
r.NotNil(ap.Account(_addr1))

// Test when the account is empty
ap.PopPeek()
ap.DeleteIfEmpty(_addr1)
r.Nil(ap.Account(_addr1))
}
Loading

0 comments on commit 8313aeb

Please sign in to comment.