Skip to content

Commit

Permalink
Merge pull request #2657 from jorgemmsilva/fix/propose-batch-wait
Browse files Browse the repository at this point in the history
fix: prevent proposing batches with 1 random request
  • Loading branch information
jorgemmsilva authored Jul 3, 2023
2 parents e75ef41 + b7aa603 commit 93bc456
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ tests_output.log
go.work
go.work.sum
TMP*
tmp.*
20 changes: 13 additions & 7 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ type reqRefNonce struct {
nonce uint64
}

func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusProposal) {
func (mpi *mempoolImpl) refsToPropose() []*isc.RequestRef {
//
// The case for matching ChainHeadAO and request BaseAO
reqRefs := []*isc.RequestRef{}
Expand Down Expand Up @@ -590,20 +590,26 @@ func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusPr
}
}
}
// remove underisable requests from the proposal
// remove undesirable requests from the proposal
reqRefs = lo.Filter(reqRefs, func(x *isc.RequestRef, _ int) bool {
return !slices.Contains(doNotPropose, x)
})
}

if len(reqRefs) > 0 {
recv.Respond(reqRefs)
return reqRefs
}

func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusProposal) {
refs := mpi.refsToPropose()
if len(refs) > 0 {
recv.Respond(refs)
return
}

//
// Wait for any request.
mpi.waitReq.WaitAny(recv.ctx, func(req isc.Request) {
recv.Respond([]*isc.RequestRef{isc.RequestRefFromRequest(req)})
mpi.waitReq.WaitAny(recv.ctx, func(_ isc.Request) {
mpi.handleConsensusProposalForChainHead(recv)
})
}

Expand Down Expand Up @@ -725,7 +731,7 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) {
// Notify existing on-ledger requests if that's first time update.
if oldTangleTime.IsZero() {
mpi.onLedgerPool.Filter(func(request isc.OnLedgerRequest, ts time.Time) bool {
mpi.waitReq.Have(request)
mpi.waitReq.MarkAvailable(request)
return true
})
}
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/mempool/typed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (olp *typedPool[V]) Add(request V) {
olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey)
olp.sizeMetric(olp.requests.Size())
}
olp.waitReq.Have(request)
olp.waitReq.MarkAvailable(request)
}

func (olp *typedPool[V]) Remove(request V) {
Expand Down
13 changes: 7 additions & 6 deletions packages/chain/mempool/wait_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type WaitReq interface {
WaitMany(ctx context.Context, reqRefs []*isc.RequestRef, cb func(req isc.Request)) // Called per block.
WaitAny(ctx context.Context, cb func(req isc.Request)) // Called per block.
Have(req isc.Request) // Called often, per request.
MarkAvailable(req isc.Request) // Called often, per request.
}

type waitReq struct {
Expand Down Expand Up @@ -55,14 +55,15 @@ func (wr *waitReq) WaitAny(ctx context.Context, cb func(req isc.Request)) {
wr.maybeCleanup()
}

func (wr *waitReq) Have(req isc.Request) {
func (wr *waitReq) MarkAvailable(req isc.Request) {
if len(wr.any) > 0 {
for i := range wr.any {
if wr.any[i].ctx.Err() == nil {
wr.any[i].cb(req)
awaiting := wr.any // copy before resetting, so that if any of the callbacks tries to await, it doesn't get squashed
wr.any = wr.any[:0]
for i := range awaiting {
if awaiting[i].ctx.Err() == nil {
awaiting[i].cb(req)
}
}
wr.any = wr.any[:0]
}
reqRefKey := isc.RequestRefFromRequest(req).AsKey()
if cbs, exists := wr.reqs.Get(reqRefKey); exists {
Expand Down
6 changes: 3 additions & 3 deletions packages/chain/mempool/wait_req_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestWaitReq(t *testing.T) {
wr.WaitMany(ctxM, []*isc.RequestRef{ref0, ref1, ref2}, func(req isc.Request) {
recvMany = append(recvMany, req)
})
wr.Have(req0)
wr.Have(req1)
wr.Have(req2)
wr.MarkAvailable(req0)
wr.MarkAvailable(req1)
wr.MarkAvailable(req2)
require.NotNil(t, recvAny)
require.Len(t, recvMany, 3)
require.Contains(t, recvMany, req0)
Expand Down
33 changes: 21 additions & 12 deletions packages/evm/jsonrpc/jsonrpctest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/ecdsa"
"errors"
"math"
"math/big"
"strings"
"testing"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -323,21 +323,30 @@ func (e *Env) TestRPCGetLogs() {
}

func (e *Env) TestRPCInvalidNonce() {
from, fromAddress := e.NewAccountWithL2Funds()
from, _ := e.NewAccountWithL2Funds()
_, toAddress := e.NewAccountWithL2Funds()
value := big.NewInt(0)
nonce := e.NonceAt(fromAddress) + 1
gasLimit := params.TxGas
tx, err := types.SignTx(
types.NewTransaction(nonce, toAddress, value, gasLimit, evm.GasPrice, nil),
e.Signer(),
from,
)
// try sending correct nonces in invalid order 1,2, then 0 - this should succeed
createTx := func(nonce uint64) *types.Transaction {
tx, err := types.SignTx(
types.NewTransaction(nonce, toAddress, big.NewInt(0), math.MaxUint64, evm.GasPrice, nil),
e.Signer(),
from,
)
require.NoError(e.T, err)
return tx
}

err := e.Client.SendTransaction(context.Background(), createTx(1))
require.NoError(e.T, err)
err = e.Client.SendTransaction(context.Background(), createTx(2))
require.NoError(e.T, err)
_, err = e.SendTransactionAndWait(createTx(0))
require.NoError(e.T, err)

_, err = e.SendTransactionAndWait(tx)
// try sending nonce 0 again
_, err = e.SendTransactionAndWait(createTx(0))
require.Error(e.T, err)
require.Regexp(e.T, `invalid transaction nonce: got 1, want 0`, err.Error())
require.Regexp(e.T, `invalid transaction nonce: got 0, want 3`, err.Error())
_, ok := err.(*isc.VMError)
require.False(e.T, ok)
}
Expand Down
4 changes: 3 additions & 1 deletion packages/evm/jsonrpc/waspevmbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (b *WaspEVMBackend) EVMSendTransaction(tx *types.Transaction) error {
return err
}
b.chain.Log().Debugf("EVMSendTransaction, evm.tx.nonce=%v, evm.tx.hash=%v => isc.req.id=%v", tx.Nonce(), tx.Hash().Hex(), req.ID())
b.chain.ReceiveOffLedgerRequest(req, b.nodePubKey)
if !b.chain.ReceiveOffLedgerRequest(req, b.nodePubKey) {
return fmt.Errorf("tx not added to the mempool")
}

// store the request ID so that the user can query it later (if the
// Ethereum tx fails, the Ethereum receipt is never generated).
Expand Down
6 changes: 6 additions & 0 deletions packages/vm/core/evm/evmimpl/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func createEmulator(ctx isc.Sandbox, l2Balance *l2Balance) *emulator.EVMEmulator

// IMPORTANT: Must only be called from the ISC VM (when the request is done executing)
func AddFailedTx(ctx isc.Sandbox, tx *types.Transaction, receipt *types.Receipt) {
if tx == nil {
panic("nil tx")
}
if receipt == nil {
panic("nil receipt")
}
emu := getBlockContext(ctx).emu
emu.BlockchainDB().AddTransaction(tx, receipt)
// we must also increment the nonce manually since the original request was reverted
Expand Down
2 changes: 1 addition & 1 deletion tools/cluster/tests/evm_jsonrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newClusterTestEnv(t *testing.T, env *ChainEnv, nodeIndex int) *clusterTestE
}
receipt, _, err := c.WaspClient.ChainsApi.
WaitForRequest(context.Background(), env.Chain.ChainID.String(), reqID.String()).
TimeoutSeconds(60).
TimeoutSeconds(10).
Execute()
if err != nil {
return err
Expand Down

0 comments on commit 93bc456

Please sign in to comment.