Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transactor, txmonitor): add tx management #33

Merged
merged 7 commits into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions x/contracts/transactor/transactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package transactor

import (
"context"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// Watcher is an interface that is used to manage the lifecycle of a transaction.
// The Allow method is used to determine if a transaction should be sent. The context
// is passed to the method so that the watcher can determine this based on the context.
// The Sent method is is used to notify the watcher that the transaction has been sent.
type Watcher interface {
Allow(ctx context.Context, nonce uint64) bool
Sent(ctx context.Context, tx *types.Transaction)
}

// Transactor is a wrapper around a bind.ContractTransactor that ensures that
// transactions are sent in nonce order and that the nonce is updated correctly.
// It also uses rate-limiting to ensure that the transactions are sent at a
// reasonable rate. The Watcher is used to manage the tx lifecycle. It is used to
// determine if a transaction should be sent and to notify the watcher when a
// transaction is sent.
// The purpose of this type is to use the abi generated code to interact with the
// contract and to manage the nonce and rate-limiting. To understand the synchronization
// better, the abi generated code calls the PendingNonceAt method to get the nonce
// and then calls SendTransaction to send the transaction. The PendingNonceAt method
// and the SendTransaction method are both called in the same goroutine. So this ensures
// that the nonce is updated correctly and that the transactions are sent in order. In case
// of an error, the nonce is put back into the channel so that it can be reused.
type Transactor struct {
bind.ContractTransactor
nonceChan chan uint64
watcher Watcher
}

func NewTransactor(
backend bind.ContractTransactor,
watcher Watcher,
) *Transactor {
nonceChan := make(chan uint64, 1)
// We need to send a value to the channel so that the first transaction
// can be sent. The value is not important as the first transaction will
// get the nonce from the blockchain.
nonceChan <- 1
return &Transactor{
ContractTransactor: backend,
watcher: watcher,
nonceChan: nonceChan,
}
}

func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
case nonce := <-t.nonceChan:
pendingNonce, err := t.ContractTransactor.PendingNonceAt(ctx, account)
if err != nil {
// this naked write is safe as only the SendTransaction writes to
// the channel. The goroutine which is trying to send the transaction
// won't be calling SendTransaction if there is an error here.
t.nonceChan <- nonce
return 0, err
}
if pendingNonce > nonce {
return pendingNonce, nil
}
return nonce, nil
}
}

func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) (retErr error) {
defer func() {
if retErr != nil {
// If the transaction fails, we need to put the nonce back into the channel
// so that it can be reused.
t.nonceChan <- tx.Nonce()
}
}()

if !t.watcher.Allow(ctx, tx.Nonce()) {
return ctx.Err()
}

if err := t.ContractTransactor.SendTransaction(ctx, tx); err != nil {
return err
}

// If the transaction is successful, we need to update the nonce and notify the
// watcher.
t.watcher.Sent(ctx, tx)
t.nonceChan <- tx.Nonce() + 1

return nil
}
174 changes: 174 additions & 0 deletions x/contracts/transactor/transactor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package transactor_test

import (
"context"
"errors"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/primevprotocol/mev-commit/x/contracts/transactor"
)

func TestTrasactor(t *testing.T) {
t.Parallel()

backend := &testBackend{
nonce: 5,
errNonce: 6,
}
watcher := &testWatcher{
allowChan: make(chan uint64),
txnChan: make(chan *types.Transaction, 1),
}
txnSender := transactor.NewTransactor(backend, watcher)

nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 5 {
t.Errorf("expected nonce to be 5, got %d", nonce)
}

// If the transaction was not sent, the PendingNonceAt should block until the
// context is canceled.
ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() {
_, err := txnSender.PendingNonceAt(ctx, common.Address{})
errC <- err
}()
cancel()

err = <-errC
if !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled error, got %v", err)
}

go func() {
nonce := <-watcher.allowChan
if nonce != 5 {
t.Errorf("expected nonce to be 5, got %d", nonce)
}
}()

err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err != nil {
t.Fatal(err)
}

select {
case txn := <-watcher.txnChan:
if txn.Nonce() != 5 {
t.Errorf("expected nonce to be 5, got %d", txn.Nonce())
}
case <-time.After(1 * time.Second):
t.Error("timed out waiting for transaction")
}

nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}

type nonceResult struct {
nonce uint64
err error
}
nonceChan := make(chan nonceResult, 1)
go func() {
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
nonceChan <- nonceResult{nonce, err}
}()

go func() {
nonce := <-watcher.allowChan
if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}
}()
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err == nil {
t.Error("expected error, got nil")
}

result := <-nonceChan
if result.err != nil {
t.Fatal(result.err)
}

if result.nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", result.nonce)
}

ctx, cancel = context.WithCancel(context.Background())
cancel()
backend.errNonce = 7
err = txnSender.SendTransaction(ctx, types.NewTransaction(6, common.Address{}, nil, 0, nil, nil))
if !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled error, got %v", err)
}

backend.pendingNonceErr = errors.New("nonce error")
_, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err == nil {
t.Error("expected error, got nil")
}

backend.pendingNonceErr = nil
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

if nonce != 6 {
t.Errorf("expected nonce to be 6, got %d", nonce)
}
}

type testWatcher struct {
allowChan chan uint64
txnChan chan *types.Transaction
}

func (w *testWatcher) Allow(ctx context.Context, nonce uint64) bool {
select {
case <-ctx.Done():
return false
case w.allowChan <- nonce:
}
return true
}

func (w *testWatcher) Sent(ctx context.Context, tx *types.Transaction) {
w.txnChan <- tx
}

type testBackend struct {
bind.ContractTransactor
nonce uint64
errNonce uint64
pendingNonceErr error
}

func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
if b.pendingNonceErr != nil {
return 0, b.pendingNonceErr
}
return b.nonce, nil
}

func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
if b.errNonce == tx.Nonce() {
return errors.New("nonce error")
}
return nil
}
Loading
Loading