diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go new file mode 100644 index 000000000..ca15db555 --- /dev/null +++ b/x/contracts/txmonitor/eth_helper.go @@ -0,0 +1,129 @@ +package txmonitor + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// StructLog represents a single operation (op code) executed during +// a transaction. It is part of the execution trace and provides detailed +// information about each step of the transaction execution. +type StructLog struct { + // Pc indicates the program counter at the point of execution. + Pc uint64 `json:"pc,omitempty"` + // Op is the name of the opcode that was executed. + Op string `json:"op,omitempty"` + // Gas represents the gas available at this step of execution. + Gas uint64 `json:"gas,omitempty"` + // GasCost is the amount of gas used by this operation. + GasCost uint64 `json:"gasCost,omitempty"` + // Depth indicates the call depth of the current operation. + Depth int `json:"depth,omitempty"` + // Error reports any exceptions or errors encountered during the + // execution of the corresponding operation (op code) in the EVM. + Error string `json:"error,omitempty"` + // Stack is the state of the EVM stack at this point of execution. + Stack []string `json:"stack,omitempty"` + // Memory represents the contents of the EVM memory at this point. + Memory []string `json:"memory,omitempty"` + // Storage is a map representing the contract storage state after this + // operation. The keys are storage slots and the values are the data + // stored at those slots. + Storage map[string]string `json:"storage,omitempty"` +} + +// TransactionTrace encapsulates the result of tracing an Ethereum transaction, +// providing a high-level overview of the transaction's execution and its +// effects on the EVM state. +type TransactionTrace struct { + // Failed indicates whether the transaction was successful. + Failed bool `json:"failed,omitempty"` + // Gas indicates the total gas used by the transaction. + Gas uint64 `json:"gas,omitempty"` + // ReturnValue is the data returned by the transaction if it was a call + // to a contract. + ReturnValue string `json:"returnValue,omitempty"` + // StructLogs is a slice of StructLog entries, each representing a step + // in the transaction's execution. These logs provide a detailed trace + // of the EVM operations, allowing for deep inspection of transaction + // behavior. + StructLogs []StructLog `json:"structLogs,omitempty"` +} + +// Debugger defines an interface for EVM debugging tools. +type Debugger interface { + // TraceTransaction takes a transaction hash and returns + // a detailed trace of the transaction's execution. + TraceTransaction(ctx context.Context, txHash common.Hash) (*TransactionTrace, error) +} + +// Result represents the result of a transaction receipt retrieval operation. +type Result struct { + // Receipt is the transaction receipt. + Receipt *types.Receipt + // Err is the error encountered during the operation. + Err error +} + +// BatchReceiptGetter is an interface for retrieving multiple receipts +type BatchReceiptGetter interface { + // BatchReceipts retrieves multiple receipts for a list of transaction hashes. + BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) +} + +type evmHelper struct { + client *rpc.Client +} + +// NewEVMHelper creates a new EVMHelper instance. +func NewEVMHelper(client *rpc.Client) *evmHelper { + return &evmHelper{client} +} + +// TraceTransaction implements Debugger.TraceTransaction interface. +func (e *evmHelper) TraceTransaction(ctx context.Context, txHash common.Hash) (*TransactionTrace, error) { + result := new(TransactionTrace) + traceOptions := map[string]interface{}{} // Empty map for default options. + if err := e.client.CallContext( + ctx, + result, + "debug_traceTransaction", + txHash, + traceOptions, + ); err != nil { + return nil, err + } + return result, nil +} + +// BatchReceipts retrieves multiple receipts for a list of transaction hashes. +func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) { + batch := make([]rpc.BatchElem, len(txHashes)) + + for i, hash := range txHashes { + batch[i] = rpc.BatchElem{ + Method: "eth_getTransactionReceipt", + Args: []interface{}{hash}, + Result: new(types.Receipt), + } + } + + // Execute the batch request + err := e.client.BatchCallContext(ctx, batch) + if err != nil { + return nil, err + } + + receipts := make([]Result, len(batch)) + for i, elem := range batch { + receipts[i].Receipt = elem.Result.(*types.Receipt) + if elem.Error != nil { + receipts[i].Err = elem.Error + } + } + + return receipts, nil +} diff --git a/x/contracts/txmonitor/txmonitor.go b/x/contracts/txmonitor/txmonitor.go new file mode 100644 index 000000000..459d47ab3 --- /dev/null +++ b/x/contracts/txmonitor/txmonitor.go @@ -0,0 +1,331 @@ +package txmonitor + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + batchSize int = 64 +) + +var ( + ErrTxnCancelled = errors.New("transaction was cancelled") + ErrMonitorClosed = errors.New("monitor was closed") +) + +type Saver interface { + Save(ctx context.Context, txHash common.Hash, nonce uint64) error + Update(ctx context.Context, txHash common.Hash, status string) error +} + +type EVMHelper interface { + BatchReceiptGetter + Debugger +} + +type EVM interface { + BlockNumber(ctx context.Context) (uint64, error) + NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) +} + +type waitCheck struct { + nonce uint64 + block uint64 +} + +type Monitor struct { + owner common.Address + mtx sync.Mutex + waitMap map[uint64]map[common.Hash][]chan Result + client EVM + helper EVMHelper + saver Saver + newTxAdded chan struct{} + nonceUpdate chan struct{} + blockUpdate chan waitCheck + logger *slog.Logger + lastConfirmedNonce atomic.Uint64 + maxPendingTxs uint64 +} + +func New( + owner common.Address, + client EVM, + helper EVMHelper, + saver Saver, + logger *slog.Logger, + maxPendingTxs uint64, +) *Monitor { + if saver == nil { + saver = noopSaver{} + } + m := &Monitor{ + owner: owner, + client: client, + logger: logger, + helper: helper, + saver: saver, + maxPendingTxs: maxPendingTxs, + waitMap: make(map[uint64]map[common.Hash][]chan Result), + newTxAdded: make(chan struct{}), + nonceUpdate: make(chan struct{}), + blockUpdate: make(chan waitCheck), + } + + return m +} + +func (m *Monitor) Start(ctx context.Context) <-chan struct{} { + wg := sync.WaitGroup{} + done := make(chan struct{}) + + wg.Add(2) + go func() { + defer wg.Done() + + queryTicker := time.NewTicker(500 * time.Millisecond) + defer queryTicker.Stop() + + defer func() { + m.mtx.Lock() + defer m.mtx.Unlock() + + for _, v := range m.waitMap { + for _, c := range v { + for _, c := range c { + c <- Result{nil, ErrMonitorClosed} + close(c) + } + } + } + }() + + lastBlock := uint64(0) + for { + newTx := false + select { + case <-ctx.Done(): + return + case <-m.newTxAdded: + newTx = true + case <-queryTicker.C: + } + + currentBlock, err := m.client.BlockNumber(ctx) + if err != nil { + m.logger.Error("failed to get block number", "err", err) + continue + } + + if currentBlock <= lastBlock && !newTx { + continue + } + + lastNonce, err := m.client.NonceAt( + ctx, + m.owner, + new(big.Int).SetUint64(currentBlock), + ) + if err != nil { + m.logger.Error("failed to get nonce", "err", err) + continue + } + + m.lastConfirmedNonce.Store(lastNonce) + m.triggerNonceUpdate() + + select { + case m.blockUpdate <- waitCheck{lastNonce, currentBlock}: + default: + } + lastBlock = currentBlock + } + }() + + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case check := <-m.blockUpdate: + m.check(ctx, check.block, check.nonce) + } + } + }() + + go func() { + wg.Wait() + close(done) + }() + + return done +} + +func (m *Monitor) Allow(ctx context.Context, nonce uint64) bool { + for { + if nonce <= m.lastConfirmedNonce.Load()+m.maxPendingTxs { + return true + } + select { + case <-ctx.Done(): + return false + case <-m.nonceUpdate: + } + } +} + +func (m *Monitor) Sent(ctx context.Context, tx *types.Transaction) { + if err := m.saver.Save(ctx, tx.Hash(), tx.Nonce()); err != nil { + m.logger.Error("failed to save transaction", "err", err) + } + + res := m.WatchTx(tx.Hash(), tx.Nonce()) + go func() { + r := <-res + status := "success" + if r.Err != nil { + m.logger.Error("transaction failed", "err", r.Err) + status = fmt.Sprintf("failed: %v", r.Err) + } + if err := m.saver.Update(context.Background(), tx.Hash(), status); err != nil { + m.logger.Error("failed to update transaction", "err", err) + } + }() +} + +func (m *Monitor) WatchTx(txHash common.Hash, nonce uint64) <-chan Result { + m.mtx.Lock() + defer m.mtx.Unlock() + + if m.waitMap[nonce] == nil { + m.waitMap[nonce] = make(map[common.Hash][]chan Result) + } + + c := make(chan Result, 1) + m.waitMap[nonce][txHash] = append(m.waitMap[nonce][txHash], c) + + m.triggerNewTx() + return c +} + +func (m *Monitor) triggerNewTx() { + select { + case m.newTxAdded <- struct{}{}: + default: + } +} + +func (m *Monitor) triggerNonceUpdate() { + select { + case m.nonceUpdate <- struct{}{}: + default: + } +} + +func (m *Monitor) getOlderTxns(nonce uint64) map[uint64][]common.Hash { + m.mtx.Lock() + defer m.mtx.Unlock() + + txnMap := make(map[uint64][]common.Hash) + for k, v := range m.waitMap { + if k >= nonce { + continue + } + + for h := range v { + txnMap[k] = append(txnMap[k], h) + } + } + + return txnMap +} + +func (m *Monitor) notify( + nonce uint64, + txn common.Hash, + res Result, +) { + m.mtx.Lock() + defer m.mtx.Unlock() + + waiters := 0 + for _, c := range m.waitMap[nonce][txn] { + c <- res + waiters++ + close(c) + } + delete(m.waitMap[nonce], txn) + if len(m.waitMap[nonce]) == 0 { + delete(m.waitMap, nonce) + } +} + +func (m *Monitor) check(ctx context.Context, newBlock uint64, lastNonce uint64) { + checkTxns := m.getOlderTxns(lastNonce) + nonceMap := make(map[common.Hash]uint64) + + if len(checkTxns) == 0 { + return + } + + txHashes := make([]common.Hash, 0, len(checkTxns)) + for n, txns := range checkTxns { + for _, txn := range txns { + txHashes = append(txHashes, txn) + nonceMap[txn] = n + } + } + + for start := 0; start < len(txHashes); start += batchSize { + end := start + batchSize + if end > len(txHashes) { + end = len(txHashes) + } + + receipts, err := m.helper.BatchReceipts(ctx, txHashes[start:end]) + if err != nil { + m.logger.Error("failed to get receipts", "err", err) + return + } + + for i, r := range receipts { + nonce := nonceMap[txHashes[start+i]] + if r.Err != nil { + if errors.Is(r.Err, ethereum.NotFound) { + m.notify(nonce, txHashes[start+i], Result{nil, ErrTxnCancelled}) + continue + } + tt, err := m.helper.TraceTransaction(ctx, txHashes[start+i]) + if err != nil { + m.logger.Error("retrieving transaction trace failed", "error", err) + } + m.logger.Error("failed to get receipt", "error", r.Err, "transaction_trace", tt) + continue + } + m.notify(nonce, txHashes[start+i], Result{r.Receipt, nil}) + } + } +} + +type noopSaver struct{} + +func (noopSaver) Save(ctx context.Context, txHash common.Hash, nonce uint64) error { + return nil +} + +func (noopSaver) Update(ctx context.Context, txHash common.Hash, status string) error { + return nil +} diff --git a/x/contracts/txmonitor/txmonitor_test.go b/x/contracts/txmonitor/txmonitor_test.go new file mode 100644 index 000000000..d33defcdd --- /dev/null +++ b/x/contracts/txmonitor/txmonitor_test.go @@ -0,0 +1,186 @@ +package txmonitor_test + +import ( + "context" + "io" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/primevprotocol/mev-commit/x/contracts/txmonitor" + "github.com/primevprotocol/mev-commit/x/util" +) + +func TestTxMonitor(t *testing.T) { + t.Parallel() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + + txns := make([]*types.Transaction, 0, 10) + for i := 1; i <= 10; i++ { + txns = append(txns, types.MustSignNewTx( + key, + types.NewLondonSigner(big.NewInt(1)), + &types.DynamicFeeTx{ + ChainID: big.NewInt(1), + Nonce: uint64(i), + GasFeeCap: big.NewInt(1), + GasTipCap: big.NewInt(1), + To: &common.Address{}, + }, + )) + } + + results := make(map[common.Hash]txmonitor.Result) + for _, tx := range txns { + results[tx.Hash()] = txmonitor.Result{Receipt: &types.Receipt{Status: 1}} + } + + evm := &testEVM{ + blockNumC: make(chan uint64), + nonceC: make(chan uint64), + } + + saver := &testSaver{status: make(map[common.Hash]string)} + + monitor := txmonitor.New( + common.Address{}, + evm, + &testEVMHelper{receipts: results}, + saver, + util.NewTestLogger(io.Discard), + 10, + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := monitor.Start(ctx) + + for _, tx := range txns { + if allow := monitor.Allow(ctx, tx.Nonce()); !allow { + t.Fatal("tx should be allowed") + } + monitor.Sent(ctx, tx) + } + + allowCtx, cancelAllow := context.WithTimeout(ctx, 200*time.Millisecond) + if allow := monitor.Allow(allowCtx, 11); allow { + t.Fatal("tx should not be allowed") + } + cancelAllow() + + // tx with same nonce to simulate cancellation + cancelledWait := monitor.WatchTx(common.HexToHash("0x1"), 2) + + evm.blockNumC <- 1 + evm.nonceC <- 5 + + for { + if saver.count() == 4 { + break + } + time.Sleep(10 * time.Millisecond) + } + + // tx with same nonce should be cancelled + res := <-cancelledWait + if res.Err != txmonitor.ErrTxnCancelled { + t.Fatal("tx should be cancelled") + } + + duplicateListener := monitor.WatchTx(txns[9].Hash(), 10) + closedListener := monitor.WatchTx(common.HexToHash("0x12"), 11) + + evm.blockNumC <- 2 + evm.nonceC <- 11 + + for { + if saver.count() == 10 { + break + } + time.Sleep(10 * time.Millisecond) + } + + // duplicate listener should be notified + res = <-duplicateListener + if res.Err != nil { + t.Fatal("tx should not have error") + } + + cancel() + <-done + + // closed listener should be notified + res = <-closedListener + if res.Err != txmonitor.ErrMonitorClosed { + t.Fatal("error should be monitor closed") + } + + for _, status := range saver.status { + if status != "success" { + t.Fatal("tx should be successful") + } + } +} + +type testEVM struct { + blockNumC chan uint64 + nonceC chan uint64 +} + +func (t *testEVM) BlockNumber(ctx context.Context) (uint64, error) { + return <-t.blockNumC, nil +} + +func (t *testEVM) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return <-t.nonceC, nil +} + +type testEVMHelper struct { + receipts map[common.Hash]txmonitor.Result +} + +func (t *testEVMHelper) BatchReceipts(ctx context.Context, txns []common.Hash) ([]txmonitor.Result, error) { + results := make([]txmonitor.Result, 0, len(txns)) + for _, tx := range txns { + if _, ok := t.receipts[tx]; !ok { + results = append(results, txmonitor.Result{Err: ethereum.NotFound}) + continue + } + results = append(results, t.receipts[tx]) + } + return results, nil +} + +func (t *testEVMHelper) TraceTransaction(ctx context.Context, tx common.Hash) (*txmonitor.TransactionTrace, error) { + return nil, nil +} + +type testSaver struct { + mu sync.Mutex + status map[common.Hash]string +} + +func (t *testSaver) count() int { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.status) +} + +func (t *testSaver) Save(ctx context.Context, txHash common.Hash, nonce uint64) error { + return nil +} + +func (t *testSaver) Update(ctx context.Context, txHash common.Hash, status string) error { + t.mu.Lock() + t.status[txHash] = status + t.mu.Unlock() + return nil +}