diff --git a/x/contracts/transactor/transactor.go b/x/contracts/transactor/transactor.go new file mode 100644 index 000000000..5791481ad --- /dev/null +++ b/x/contracts/transactor/transactor.go @@ -0,0 +1,98 @@ +package transactor + +import ( + "context" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Watcher is an interface that is used to manage the lifecycle of a transaction. +// The Allow method is used to determine if a transaction should be sent. The context +// is passed to the method so that the watcher can determine this based on the context. +// The Sent method is is used to notify the watcher that the transaction has been sent. +type Watcher interface { + Allow(ctx context.Context, nonce uint64) bool + Sent(ctx context.Context, tx *types.Transaction) +} + +// Transactor is a wrapper around a bind.ContractTransactor that ensures that +// transactions are sent in nonce order and that the nonce is updated correctly. +// It also uses rate-limiting to ensure that the transactions are sent at a +// reasonable rate. The Watcher is used to manage the tx lifecycle. It is used to +// determine if a transaction should be sent and to notify the watcher when a +// transaction is sent. +// The purpose of this type is to use the abi generated code to interact with the +// contract and to manage the nonce and rate-limiting. To understand the synchronization +// better, the abi generated code calls the PendingNonceAt method to get the nonce +// and then calls SendTransaction to send the transaction. The PendingNonceAt method +// and the SendTransaction method are both called in the same goroutine. So this ensures +// that the nonce is updated correctly and that the transactions are sent in order. In case +// of an error, the nonce is put back into the channel so that it can be reused. +type Transactor struct { + bind.ContractTransactor + nonceChan chan uint64 + watcher Watcher +} + +func NewTransactor( + backend bind.ContractTransactor, + watcher Watcher, +) *Transactor { + nonceChan := make(chan uint64, 1) + // We need to send a value to the channel so that the first transaction + // can be sent. The value is not important as the first transaction will + // get the nonce from the blockchain. + nonceChan <- 1 + return &Transactor{ + ContractTransactor: backend, + watcher: watcher, + nonceChan: nonceChan, + } +} + +func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case nonce := <-t.nonceChan: + pendingNonce, err := t.ContractTransactor.PendingNonceAt(ctx, account) + if err != nil { + // this naked write is safe as only the SendTransaction writes to + // the channel. The goroutine which is trying to send the transaction + // won't be calling SendTransaction if there is an error here. + t.nonceChan <- nonce + return 0, err + } + if pendingNonce > nonce { + return pendingNonce, nil + } + return nonce, nil + } +} + +func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) (retErr error) { + defer func() { + if retErr != nil { + // If the transaction fails, we need to put the nonce back into the channel + // so that it can be reused. + t.nonceChan <- tx.Nonce() + } + }() + + if !t.watcher.Allow(ctx, tx.Nonce()) { + return ctx.Err() + } + + if err := t.ContractTransactor.SendTransaction(ctx, tx); err != nil { + return err + } + + // If the transaction is successful, we need to update the nonce and notify the + // watcher. + t.watcher.Sent(ctx, tx) + t.nonceChan <- tx.Nonce() + 1 + + return nil +} diff --git a/x/contracts/transactor/transactor_test.go b/x/contracts/transactor/transactor_test.go new file mode 100644 index 000000000..d4b150f86 --- /dev/null +++ b/x/contracts/transactor/transactor_test.go @@ -0,0 +1,174 @@ +package transactor_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/primevprotocol/mev-commit/x/contracts/transactor" +) + +func TestTrasactor(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 5, + errNonce: 6, + } + watcher := &testWatcher{ + allowChan: make(chan uint64), + txnChan: make(chan *types.Transaction, 1), + } + txnSender := transactor.NewTransactor(backend, watcher) + + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + + if nonce != 5 { + t.Errorf("expected nonce to be 5, got %d", nonce) + } + + // If the transaction was not sent, the PendingNonceAt should block until the + // context is canceled. + ctx, cancel := context.WithCancel(context.Background()) + errC := make(chan error) + go func() { + _, err := txnSender.PendingNonceAt(ctx, common.Address{}) + errC <- err + }() + cancel() + + err = <-errC + if !errors.Is(err, context.Canceled) { + t.Errorf("expected context.Canceled error, got %v", err) + } + + go func() { + nonce := <-watcher.allowChan + if nonce != 5 { + t.Errorf("expected nonce to be 5, got %d", nonce) + } + }() + + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err != nil { + t.Fatal(err) + } + + select { + case txn := <-watcher.txnChan: + if txn.Nonce() != 5 { + t.Errorf("expected nonce to be 5, got %d", txn.Nonce()) + } + case <-time.After(1 * time.Second): + t.Error("timed out waiting for transaction") + } + + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + + if nonce != 6 { + t.Errorf("expected nonce to be 6, got %d", nonce) + } + + type nonceResult struct { + nonce uint64 + err error + } + nonceChan := make(chan nonceResult, 1) + go func() { + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + nonceChan <- nonceResult{nonce, err} + }() + + go func() { + nonce := <-watcher.allowChan + if nonce != 6 { + t.Errorf("expected nonce to be 6, got %d", nonce) + } + }() + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err == nil { + t.Error("expected error, got nil") + } + + result := <-nonceChan + if result.err != nil { + t.Fatal(result.err) + } + + if result.nonce != 6 { + t.Errorf("expected nonce to be 6, got %d", result.nonce) + } + + ctx, cancel = context.WithCancel(context.Background()) + cancel() + backend.errNonce = 7 + err = txnSender.SendTransaction(ctx, types.NewTransaction(6, common.Address{}, nil, 0, nil, nil)) + if !errors.Is(err, context.Canceled) { + t.Errorf("expected context.Canceled error, got %v", err) + } + + backend.pendingNonceErr = errors.New("nonce error") + _, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err == nil { + t.Error("expected error, got nil") + } + + backend.pendingNonceErr = nil + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + + if nonce != 6 { + t.Errorf("expected nonce to be 6, got %d", nonce) + } +} + +type testWatcher struct { + allowChan chan uint64 + txnChan chan *types.Transaction +} + +func (w *testWatcher) Allow(ctx context.Context, nonce uint64) bool { + select { + case <-ctx.Done(): + return false + case w.allowChan <- nonce: + } + return true +} + +func (w *testWatcher) Sent(ctx context.Context, tx *types.Transaction) { + w.txnChan <- tx +} + +type testBackend struct { + bind.ContractTransactor + nonce uint64 + errNonce uint64 + pendingNonceErr error +} + +func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + if b.pendingNonceErr != nil { + return 0, b.pendingNonceErr + } + return b.nonce, nil +} + +func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error { + if b.errNonce == tx.Nonce() { + return errors.New("nonce error") + } + return nil +} diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go new file mode 100644 index 000000000..ca15db555 --- /dev/null +++ b/x/contracts/txmonitor/eth_helper.go @@ -0,0 +1,129 @@ +package txmonitor + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// StructLog represents a single operation (op code) executed during +// a transaction. It is part of the execution trace and provides detailed +// information about each step of the transaction execution. +type StructLog struct { + // Pc indicates the program counter at the point of execution. + Pc uint64 `json:"pc,omitempty"` + // Op is the name of the opcode that was executed. + Op string `json:"op,omitempty"` + // Gas represents the gas available at this step of execution. + Gas uint64 `json:"gas,omitempty"` + // GasCost is the amount of gas used by this operation. + GasCost uint64 `json:"gasCost,omitempty"` + // Depth indicates the call depth of the current operation. + Depth int `json:"depth,omitempty"` + // Error reports any exceptions or errors encountered during the + // execution of the corresponding operation (op code) in the EVM. + Error string `json:"error,omitempty"` + // Stack is the state of the EVM stack at this point of execution. + Stack []string `json:"stack,omitempty"` + // Memory represents the contents of the EVM memory at this point. + Memory []string `json:"memory,omitempty"` + // Storage is a map representing the contract storage state after this + // operation. The keys are storage slots and the values are the data + // stored at those slots. + Storage map[string]string `json:"storage,omitempty"` +} + +// TransactionTrace encapsulates the result of tracing an Ethereum transaction, +// providing a high-level overview of the transaction's execution and its +// effects on the EVM state. +type TransactionTrace struct { + // Failed indicates whether the transaction was successful. + Failed bool `json:"failed,omitempty"` + // Gas indicates the total gas used by the transaction. + Gas uint64 `json:"gas,omitempty"` + // ReturnValue is the data returned by the transaction if it was a call + // to a contract. + ReturnValue string `json:"returnValue,omitempty"` + // StructLogs is a slice of StructLog entries, each representing a step + // in the transaction's execution. These logs provide a detailed trace + // of the EVM operations, allowing for deep inspection of transaction + // behavior. + StructLogs []StructLog `json:"structLogs,omitempty"` +} + +// Debugger defines an interface for EVM debugging tools. +type Debugger interface { + // TraceTransaction takes a transaction hash and returns + // a detailed trace of the transaction's execution. + TraceTransaction(ctx context.Context, txHash common.Hash) (*TransactionTrace, error) +} + +// Result represents the result of a transaction receipt retrieval operation. +type Result struct { + // Receipt is the transaction receipt. + Receipt *types.Receipt + // Err is the error encountered during the operation. + Err error +} + +// BatchReceiptGetter is an interface for retrieving multiple receipts +type BatchReceiptGetter interface { + // BatchReceipts retrieves multiple receipts for a list of transaction hashes. + BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) +} + +type evmHelper struct { + client *rpc.Client +} + +// NewEVMHelper creates a new EVMHelper instance. +func NewEVMHelper(client *rpc.Client) *evmHelper { + return &evmHelper{client} +} + +// TraceTransaction implements Debugger.TraceTransaction interface. +func (e *evmHelper) TraceTransaction(ctx context.Context, txHash common.Hash) (*TransactionTrace, error) { + result := new(TransactionTrace) + traceOptions := map[string]interface{}{} // Empty map for default options. + if err := e.client.CallContext( + ctx, + result, + "debug_traceTransaction", + txHash, + traceOptions, + ); err != nil { + return nil, err + } + return result, nil +} + +// BatchReceipts retrieves multiple receipts for a list of transaction hashes. +func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) { + batch := make([]rpc.BatchElem, len(txHashes)) + + for i, hash := range txHashes { + batch[i] = rpc.BatchElem{ + Method: "eth_getTransactionReceipt", + Args: []interface{}{hash}, + Result: new(types.Receipt), + } + } + + // Execute the batch request + err := e.client.BatchCallContext(ctx, batch) + if err != nil { + return nil, err + } + + receipts := make([]Result, len(batch)) + for i, elem := range batch { + receipts[i].Receipt = elem.Result.(*types.Receipt) + if elem.Error != nil { + receipts[i].Err = elem.Error + } + } + + return receipts, nil +} diff --git a/x/contracts/txmonitor/txmonitor.go b/x/contracts/txmonitor/txmonitor.go new file mode 100644 index 000000000..c43869ecf --- /dev/null +++ b/x/contracts/txmonitor/txmonitor.go @@ -0,0 +1,345 @@ +package txmonitor + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + batchSize int = 64 +) + +var ( + ErrTxnCancelled = errors.New("transaction was cancelled") + ErrMonitorClosed = errors.New("monitor was closed") +) + +type Saver interface { + Save(ctx context.Context, txHash common.Hash, nonce uint64) error + Update(ctx context.Context, txHash common.Hash, status string) error +} + +type EVMHelper interface { + BatchReceiptGetter + Debugger +} + +type EVM interface { + BlockNumber(ctx context.Context) (uint64, error) + NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) +} + +type waitCheck struct { + nonce uint64 + block uint64 +} + +// Monitor is a transaction monitor that keeps track of the transactions sent by the owner +// and waits for the receipt to be available to query them in batches. The monitor also +// ensures rate-limiting by waiting for the nonce to be confirmed by the client before +// allowing the transaction to be sent. The alternative is that each client keeps +// polling the backend for receipts which is inefficient. +type Monitor struct { + owner common.Address + mtx sync.Mutex + waitMap map[uint64]map[common.Hash][]chan Result + client EVM + helper EVMHelper + saver Saver + newTxAdded chan struct{} + nonceUpdate chan struct{} + blockUpdate chan waitCheck + logger *slog.Logger + lastConfirmedNonce atomic.Uint64 + maxPendingTxs uint64 +} + +func New( + owner common.Address, + client EVM, + helper EVMHelper, + saver Saver, + logger *slog.Logger, + maxPendingTxs uint64, +) *Monitor { + if saver == nil { + saver = noopSaver{} + } + m := &Monitor{ + owner: owner, + client: client, + logger: logger, + helper: helper, + saver: saver, + maxPendingTxs: maxPendingTxs, + waitMap: make(map[uint64]map[common.Hash][]chan Result), + newTxAdded: make(chan struct{}), + nonceUpdate: make(chan struct{}), + blockUpdate: make(chan waitCheck), + } + + return m +} + +func (m *Monitor) Start(ctx context.Context) <-chan struct{} { + wg := sync.WaitGroup{} + done := make(chan struct{}) + + wg.Add(2) + go func() { + defer wg.Done() + + queryTicker := time.NewTicker(500 * time.Millisecond) + defer queryTicker.Stop() + + defer func() { + m.mtx.Lock() + defer m.mtx.Unlock() + + for _, v := range m.waitMap { + for _, c := range v { + for _, c := range c { + c <- Result{nil, ErrMonitorClosed} + close(c) + } + } + } + }() + + lastBlock := uint64(0) + for { + newTx := false + select { + case <-ctx.Done(): + return + case <-m.newTxAdded: + newTx = true + case <-queryTicker.C: + } + + currentBlock, err := m.client.BlockNumber(ctx) + if err != nil { + m.logger.Error("failed to get block number", "err", err) + continue + } + + if currentBlock <= lastBlock && !newTx { + continue + } + + lastNonce, err := m.client.NonceAt( + ctx, + m.owner, + new(big.Int).SetUint64(currentBlock), + ) + if err != nil { + m.logger.Error("failed to get nonce", "err", err) + continue + } + + m.lastConfirmedNonce.Store(lastNonce) + m.triggerNonceUpdate() + + select { + case m.blockUpdate <- waitCheck{lastNonce, currentBlock}: + default: + } + lastBlock = currentBlock + } + }() + + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case check := <-m.blockUpdate: + m.check(ctx, check.block, check.nonce) + } + } + }() + + go func() { + wg.Wait() + close(done) + }() + + return done +} + +// Allow waits until a sufficiently high nonce is confirmed by the client. If +// the context is cancelled, the function returns false. +func (m *Monitor) Allow(ctx context.Context, nonce uint64) bool { + for { + if nonce <= m.lastConfirmedNonce.Load()+m.maxPendingTxs { + return true + } + select { + case <-ctx.Done(): + return false + case <-m.nonceUpdate: + } + } +} + +// Sent saves the transaction and starts monitoring it for the receipt. The Saver +// is used to save the transaction details in the storage backend of choice. +func (m *Monitor) Sent(ctx context.Context, tx *types.Transaction) { + if err := m.saver.Save(ctx, tx.Hash(), tx.Nonce()); err != nil { + m.logger.Error("failed to save transaction", "err", err) + } + + res := m.WatchTx(tx.Hash(), tx.Nonce()) + go func() { + r := <-res + status := "success" + if r.Err != nil { + m.logger.Error("transaction failed", "err", r.Err) + status = fmt.Sprintf("failed: %v", r.Err) + } + if err := m.saver.Update(context.Background(), tx.Hash(), status); err != nil { + m.logger.Error("failed to update transaction", "err", err) + } + }() +} + +func (m *Monitor) WatchTx(txHash common.Hash, nonce uint64) <-chan Result { + m.mtx.Lock() + defer m.mtx.Unlock() + + if m.waitMap[nonce] == nil { + m.waitMap[nonce] = make(map[common.Hash][]chan Result) + } + + c := make(chan Result, 1) + m.waitMap[nonce][txHash] = append(m.waitMap[nonce][txHash], c) + + m.triggerNewTx() + return c +} + +func (m *Monitor) triggerNewTx() { + select { + case m.newTxAdded <- struct{}{}: + default: + } +} + +func (m *Monitor) triggerNonceUpdate() { + select { + case m.nonceUpdate <- struct{}{}: + default: + } +} + +// waitMap holds the transactions that are waiting for the receipt with the nonce as the key. +// The passed nonce is the last nonce that was confirmed by the client, so any transactions +// with a nonce less than this value are supposed to be confirmed and waiting for the receipt. +func (m *Monitor) getOlderTxns(nonce uint64) map[uint64][]common.Hash { + m.mtx.Lock() + defer m.mtx.Unlock() + + txnMap := make(map[uint64][]common.Hash) + for k, v := range m.waitMap { + if k >= nonce { + continue + } + + for h := range v { + txnMap[k] = append(txnMap[k], h) + } + } + + return txnMap +} + +func (m *Monitor) notify( + nonce uint64, + txn common.Hash, + res Result, +) { + m.mtx.Lock() + defer m.mtx.Unlock() + + waiters := 0 + for _, c := range m.waitMap[nonce][txn] { + c <- res + waiters++ + close(c) + } + delete(m.waitMap[nonce], txn) + if len(m.waitMap[nonce]) == 0 { + delete(m.waitMap, nonce) + } +} + +// check retrieves the receipts for the transactions with nonce less than the lastNonce +// and notifies the waiting clients. +func (m *Monitor) check(ctx context.Context, newBlock uint64, lastNonce uint64) { + checkTxns := m.getOlderTxns(lastNonce) + nonceMap := make(map[common.Hash]uint64) + + if len(checkTxns) == 0 { + return + } + + txHashes := make([]common.Hash, 0, len(checkTxns)) + for n, txns := range checkTxns { + for _, txn := range txns { + txHashes = append(txHashes, txn) + nonceMap[txn] = n + } + } + + for start := 0; start < len(txHashes); start += batchSize { + end := start + batchSize + if end > len(txHashes) { + end = len(txHashes) + } + + receipts, err := m.helper.BatchReceipts(ctx, txHashes[start:end]) + if err != nil { + m.logger.Error("failed to get receipts", "err", err) + return + } + + for i, r := range receipts { + nonce := nonceMap[txHashes[start+i]] + if r.Err != nil { + if errors.Is(r.Err, ethereum.NotFound) { + m.notify(nonce, txHashes[start+i], Result{nil, ErrTxnCancelled}) + continue + } + tt, err := m.helper.TraceTransaction(ctx, txHashes[start+i]) + if err != nil { + m.logger.Error("retrieving transaction trace failed", "error", err) + } + m.logger.Error("failed to get receipt", "error", r.Err, "transaction_trace", tt) + continue + } + m.notify(nonce, txHashes[start+i], Result{r.Receipt, nil}) + } + } +} + +type noopSaver struct{} + +func (noopSaver) Save(ctx context.Context, txHash common.Hash, nonce uint64) error { + return nil +} + +func (noopSaver) Update(ctx context.Context, txHash common.Hash, status string) error { + return nil +} diff --git a/x/contracts/txmonitor/txmonitor_test.go b/x/contracts/txmonitor/txmonitor_test.go new file mode 100644 index 000000000..d33defcdd --- /dev/null +++ b/x/contracts/txmonitor/txmonitor_test.go @@ -0,0 +1,186 @@ +package txmonitor_test + +import ( + "context" + "io" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/primevprotocol/mev-commit/x/contracts/txmonitor" + "github.com/primevprotocol/mev-commit/x/util" +) + +func TestTxMonitor(t *testing.T) { + t.Parallel() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + + txns := make([]*types.Transaction, 0, 10) + for i := 1; i <= 10; i++ { + txns = append(txns, types.MustSignNewTx( + key, + types.NewLondonSigner(big.NewInt(1)), + &types.DynamicFeeTx{ + ChainID: big.NewInt(1), + Nonce: uint64(i), + GasFeeCap: big.NewInt(1), + GasTipCap: big.NewInt(1), + To: &common.Address{}, + }, + )) + } + + results := make(map[common.Hash]txmonitor.Result) + for _, tx := range txns { + results[tx.Hash()] = txmonitor.Result{Receipt: &types.Receipt{Status: 1}} + } + + evm := &testEVM{ + blockNumC: make(chan uint64), + nonceC: make(chan uint64), + } + + saver := &testSaver{status: make(map[common.Hash]string)} + + monitor := txmonitor.New( + common.Address{}, + evm, + &testEVMHelper{receipts: results}, + saver, + util.NewTestLogger(io.Discard), + 10, + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := monitor.Start(ctx) + + for _, tx := range txns { + if allow := monitor.Allow(ctx, tx.Nonce()); !allow { + t.Fatal("tx should be allowed") + } + monitor.Sent(ctx, tx) + } + + allowCtx, cancelAllow := context.WithTimeout(ctx, 200*time.Millisecond) + if allow := monitor.Allow(allowCtx, 11); allow { + t.Fatal("tx should not be allowed") + } + cancelAllow() + + // tx with same nonce to simulate cancellation + cancelledWait := monitor.WatchTx(common.HexToHash("0x1"), 2) + + evm.blockNumC <- 1 + evm.nonceC <- 5 + + for { + if saver.count() == 4 { + break + } + time.Sleep(10 * time.Millisecond) + } + + // tx with same nonce should be cancelled + res := <-cancelledWait + if res.Err != txmonitor.ErrTxnCancelled { + t.Fatal("tx should be cancelled") + } + + duplicateListener := monitor.WatchTx(txns[9].Hash(), 10) + closedListener := monitor.WatchTx(common.HexToHash("0x12"), 11) + + evm.blockNumC <- 2 + evm.nonceC <- 11 + + for { + if saver.count() == 10 { + break + } + time.Sleep(10 * time.Millisecond) + } + + // duplicate listener should be notified + res = <-duplicateListener + if res.Err != nil { + t.Fatal("tx should not have error") + } + + cancel() + <-done + + // closed listener should be notified + res = <-closedListener + if res.Err != txmonitor.ErrMonitorClosed { + t.Fatal("error should be monitor closed") + } + + for _, status := range saver.status { + if status != "success" { + t.Fatal("tx should be successful") + } + } +} + +type testEVM struct { + blockNumC chan uint64 + nonceC chan uint64 +} + +func (t *testEVM) BlockNumber(ctx context.Context) (uint64, error) { + return <-t.blockNumC, nil +} + +func (t *testEVM) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return <-t.nonceC, nil +} + +type testEVMHelper struct { + receipts map[common.Hash]txmonitor.Result +} + +func (t *testEVMHelper) BatchReceipts(ctx context.Context, txns []common.Hash) ([]txmonitor.Result, error) { + results := make([]txmonitor.Result, 0, len(txns)) + for _, tx := range txns { + if _, ok := t.receipts[tx]; !ok { + results = append(results, txmonitor.Result{Err: ethereum.NotFound}) + continue + } + results = append(results, t.receipts[tx]) + } + return results, nil +} + +func (t *testEVMHelper) TraceTransaction(ctx context.Context, tx common.Hash) (*txmonitor.TransactionTrace, error) { + return nil, nil +} + +type testSaver struct { + mu sync.Mutex + status map[common.Hash]string +} + +func (t *testSaver) count() int { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.status) +} + +func (t *testSaver) Save(ctx context.Context, txHash common.Hash, nonce uint64) error { + return nil +} + +func (t *testSaver) Update(ctx context.Context, txHash common.Hash, status string) error { + t.mu.Lock() + t.status[txHash] = status + t.mu.Unlock() + return nil +}