Skip to content

Commit

Permalink
feat(transactor, txmonitor): add tx management (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Apr 25, 2024
1 parent d012193 commit b2eff5b
Show file tree
Hide file tree
Showing 5 changed files with 932 additions and 0 deletions.
98 changes: 98 additions & 0 deletions x/contracts/transactor/transactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package transactor

import (
"context"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// Watcher is an interface that is used to manage the lifecycle of a transaction.
// The Allow method is used to determine if a transaction should be sent. The context
// is passed to the method so that the watcher can determine this based on the context.
// The Sent method is is used to notify the watcher that the transaction has been sent.
type Watcher interface {
Allow(ctx context.Context, nonce uint64) bool
Sent(ctx context.Context, tx *types.Transaction)
}

// Transactor is a wrapper around a bind.ContractTransactor that ensures that
// transactions are sent in nonce order and that the nonce is updated correctly.
// It also uses rate-limiting to ensure that the transactions are sent at a
// reasonable rate. The Watcher is used to manage the tx lifecycle. It is used to
// determine if a transaction should be sent and to notify the watcher when a
// transaction is sent.
// The purpose of this type is to use the abi generated code to interact with the
// contract and to manage the nonce and rate-limiting. To understand the synchronization
// better, the abi generated code calls the PendingNonceAt method to get the nonce
// and then calls SendTransaction to send the transaction. The PendingNonceAt method
// and the SendTransaction method are both called in the same goroutine. So this ensures
// that the nonce is updated correctly and that the transactions are sent in order. In case
// of an error, the nonce is put back into the channel so that it can be reused.
type Transactor struct {
bind.ContractTransactor
nonceChan chan uint64
watcher Watcher
}

func NewTransactor(
backend bind.ContractTransactor,
watcher Watcher,
) *Transactor {
nonceChan := make(chan uint64, 1)
// We need to send a value to the channel so that the first transaction
// can be sent. The value is not important as the first transaction will
// get the nonce from the blockchain.
nonceChan <- 1
return &Transactor{
ContractTransactor: backend,
watcher: watcher,
nonceChan: nonceChan,
}
}

func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
case nonce := <-t.nonceChan:
pendingNonce, err := t.ContractTransactor.PendingNonceAt(ctx, account)
if err != nil {
// this naked write is safe as only the SendTransaction writes to
// the channel. The goroutine which is trying to send the transaction
// won't be calling SendTransaction if there is an error here.
t.nonceChan <- nonce
return 0, err
}
if pendingNonce > nonce {
return pendingNonce, nil
}
return nonce, nil
}
}

func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) (retErr error) {
defer func() {
if retErr != nil {
// If the transaction fails, we need to put the nonce back into the channel
// so that it can be reused.
t.nonceChan <- tx.Nonce()
}
}()

if !t.watcher.Allow(ctx, tx.Nonce()) {
return ctx.Err()
}

if err := t.ContractTransactor.SendTransaction(ctx, tx); err != nil {
return err
}

// If the transaction is successful, we need to update the nonce and notify the
// watcher.
t.watcher.Sent(ctx, tx)
t.nonceChan <- tx.Nonce() + 1

return nil
}
174 changes: 174 additions & 0 deletions x/contracts/transactor/transactor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package transactor_test

import (
"context"
"errors"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/primevprotocol/mev-commit/x/contracts/transactor"
)

func TestTrasactor(t *testing.T) {
t.Parallel()

backend := &testBackend{
nonce: 5,
errNonce: 6,
}
watcher := &testWatcher{
allowChan: make(chan uint64),
txnChan: make(chan *types.Transaction, 1),
}
txnSender := transactor.NewTransactor(backend, watcher)

nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 5 {
t.Errorf("expected nonce to be 5, got %d", nonce)
}

// If the transaction was not sent, the PendingNonceAt should block until the
// context is canceled.
ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() {
_, err := txnSender.PendingNonceAt(ctx, common.Address{})
errC <- err
}()
cancel()

err = <-errC
if !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled error, got %v", err)
}

go func() {
nonce := <-watcher.allowChan
if nonce != 5 {
t.Errorf("expected nonce to be 5, got %d", nonce)
}
}()

err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err != nil {
t.Fatal(err)
}

select {
case txn := <-watcher.txnChan:
if txn.Nonce() != 5 {
t.Errorf("expected nonce to be 5, got %d", txn.Nonce())
}
case <-time.After(1 * time.Second):
t.Error("timed out waiting for transaction")
}

nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}

type nonceResult struct {
nonce uint64
err error
}
nonceChan := make(chan nonceResult, 1)
go func() {
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
nonceChan <- nonceResult{nonce, err}
}()

go func() {
nonce := <-watcher.allowChan
if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}
}()
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err == nil {
t.Error("expected error, got nil")
}

result := <-nonceChan
if result.err != nil {
t.Fatal(result.err)
}

if result.nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", result.nonce)
}

ctx, cancel = context.WithCancel(context.Background())
cancel()
backend.errNonce = 7
err = txnSender.SendTransaction(ctx, types.NewTransaction(6, common.Address{}, nil, 0, nil, nil))
if !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled error, got %v", err)
}

backend.pendingNonceErr = errors.New("nonce error")
_, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err == nil {
t.Error("expected error, got nil")
}

backend.pendingNonceErr = nil
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}
}

type testWatcher struct {
allowChan chan uint64
txnChan chan *types.Transaction
}

func (w *testWatcher) Allow(ctx context.Context, nonce uint64) bool {
select {
case <-ctx.Done():
return false
case w.allowChan <- nonce:
}
return true
}

func (w *testWatcher) Sent(ctx context.Context, tx *types.Transaction) {
w.txnChan <- tx
}

type testBackend struct {
bind.ContractTransactor
nonce uint64
errNonce uint64
pendingNonceErr error
}

func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
if b.pendingNonceErr != nil {
return 0, b.pendingNonceErr
}
return b.nonce, nil
}

func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
if b.errNonce == tx.Nonce() {
return errors.New("nonce error")
}
return nil
}
Loading

0 comments on commit b2eff5b

Please sign in to comment.