Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EVM] rebased PR #184

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err
}

func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ
return res.GetInfo(), nil
}

func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return res.GetCheckTx(), nil
return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil
}

func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil
}

func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil
func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil
}

func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) {
Expand Down
8 changes: 8 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (*
func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx)
}

func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
resV2, err := app.Application.CheckTx(ctx, req)
if err != nil {
return &types.ResponseCheckTx{}, err
}
return resV2.ResponseCheckTx, nil
}
6 changes: 3 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Application interface {
Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state

// Mempool Connection
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool

// Consensus Connection
InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo,
return &ResponseInfo{}, nil
}

func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
return &ResponseCheckTx{Code: CodeTypeOK}, nil
func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) {
return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil
}

func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response {
}
}

func ToResponseCheckTx(res *ResponseCheckTx) *Response {
func ToResponseCheckTx(res *ResponseCheckTxV2) *Response {
return &Response{
Value: &Response_CheckTx{res},
Value: &Response_CheckTx{res.ResponseCheckTx},
}
}

Expand Down
8 changes: 4 additions & 4 deletions abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,23 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) {
}
return s, nil
}

type PendingTxCheckerResponse int

const (
Accepted PendingTxCheckerResponse = iota
Rejected
Pending
)

type PendingTxChecker func() PendingTxCheckerResponse
type ExpireTxHandler func()

// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler
}
8 changes: 4 additions & 4 deletions internal/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques
return res, nil
}

func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
app.mu.Lock()
defer app.mu.Unlock()

txValue := txAsUint64(req.Tx)
if txValue != uint64(app.mempoolTxCount) {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
}, nil
}}, nil
}
app.mempoolTxCount++
return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil
}

func txAsUint64(tx []byte) uint64 {
Expand Down
97 changes: 65 additions & 32 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -82,6 +81,10 @@ type TxMempool struct {
// index. i.e. older transactions are first.
timestampIndex *WrappedTxList

// pendingTxs stores transactions that are not valid yet but might become valid
// if its checker returns Accepted
pendingTxs *PendingTxs

// A read/write lock is used to safe guard updates, insertions and deletions
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
// however, a caller must explicitly grab a write-lock via Lock when updating
Expand Down Expand Up @@ -121,6 +124,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -173,7 +177,9 @@ func (txmp *TxMempool) Unlock() {
// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int {
return txmp.txStore.Size()
txSize := txmp.txStore.Size()
pendingSize := txmp.pendingTxs.Size()
return txSize + pendingSize
}

// SizeBytes return the total sum in bytes of all the valid transactions in the
Expand Down Expand Up @@ -285,19 +291,34 @@ func (txmp *TxMempool) CheckTx(
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
expiredCallback: func(removeFromCache bool) {
if removeFromCache {
txmp.cache.Remove(tx)
}
if res.ExpireTxHandler != nil {
res.ExpireTxHandler()
}
},
}

// only add new transaction if checkTx passes
if err == nil {
err = txmp.addNewTransaction(wtx, res, txInfo)

if err != nil {
return err
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)
if err != nil {
return err
}
} else {
// otherwise add to pending txs store
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}

if cb != nil {
cb(res)
cb(res.ResponseCheckTx)
}

return nil
Expand Down Expand Up @@ -420,24 +441,17 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

numTxs := txmp.priorityIndex.NumTxs()
if max < 0 {
max = numTxs
}

cap := tmmath.MinInt(numTxs, max)

// wTxs contains a list of *WrappedTx retrieved from the priority queue that
// need to be re-enqueued prior to returning.
wTxs := make([]*WrappedTx, 0, cap)
txs := make([]types.Tx, 0, cap)
for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
wtx := txmp.priorityIndex.PopTx()
wTxs := txmp.priorityIndex.PeekTxs(max)
txs := make([]types.Tx, 0, len(wTxs))
for _, wtx := range wTxs {
txs = append(txs, wtx.tx)
wTxs = append(wTxs, wtx)
}
for _, wtx := range wTxs {
txmp.priorityIndex.PushTx(wtx)
if len(txs) < max {
// retrieve more from pending txs
pending := txmp.pendingTxs.Peek(max - len(txs))
for _, ptx := range pending {
txs = append(txs, ptx.tx.tx)
}
}
return txs
}
Expand Down Expand Up @@ -486,6 +500,7 @@ func (txmp *TxMempool) Update(
}

txmp.purgeExpiredTxs(blockHeight)
txmp.handlePendingTransactions()

// If there any uncommitted transactions left in the mempool, we either
// initiate re-CheckTx per remaining transaction or notify that remaining
Expand Down Expand Up @@ -648,7 +663,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
//
// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) {
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) {
if txmp.recheckCursor == nil {
return
}
Expand Down Expand Up @@ -691,10 +706,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, res)
err = txmp.postCheck(tx, res.ResponseCheckTx)
}

if res.Code == abci.CodeTypeOK && err == nil {
// we will treat a transaction that turns pending in a recheck as invalid and evict it
if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction {
wtx.priority = res.Priority
} else {
txmp.logger.Debug(
Expand Down Expand Up @@ -828,14 +844,12 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {

atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

if removeFromCache {
txmp.cache.Remove(wtx.tx)
}
wtx.expiredCallback(removeFromCache)
}

// purgeExpiredTxs removes all transactions that have exceeded their respective
// height- and/or time-based TTLs from their respective indexes. Every expired
// transaction will be removed from the mempool, but preserved in the cache.
// transaction will be removed from the mempool, but preserved in the cache (except for pending txs).
//
// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
// the caller has a write-lock on the mempool and so we can safely iterate over
Expand Down Expand Up @@ -879,8 +893,13 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

for _, wtx := range expiredTxs {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache)
})
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down Expand Up @@ -919,3 +938,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
jsonData, _ := json.Marshal(logs)
return string(jsonData)
}

func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
txmp.cache.Remove(tx.tx.tx)
}
}
}
Loading
Loading