diff --git a/infrastructure/nomad/playbooks/variables/profiles.yml b/infrastructure/nomad/playbooks/variables/profiles.yml index d72307e58..ece0ee884 100644 --- a/infrastructure/nomad/playbooks/variables/profiles.yml +++ b/infrastructure/nomad/playbooks/variables/profiles.yml @@ -1,5 +1,5 @@ datacenter: "dc1" -l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/WqNEQeeexFLQwECjxCPpdep0uvCgn8Yj" +l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/H8JN1wImnEPrxkFRVOT7cJ_gzu9x3VmB" artifacts: bidder_emulator: &bidder_emulator_artifact diff --git a/oracle/pkg/l1Listener/l1Listener.go b/oracle/pkg/l1Listener/l1Listener.go index b8f3d0e71..e590afb2c 100644 --- a/oracle/pkg/l1Listener/l1Listener.go +++ b/oracle/pkg/l1Listener/l1Listener.go @@ -27,6 +27,7 @@ type WinnerRegister interface { type EthClient interface { BlockNumber(ctx context.Context) (uint64, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) } type L1Listener struct { diff --git a/oracle/pkg/l1Listener/l1Listener_test.go b/oracle/pkg/l1Listener/l1Listener_test.go index db9e7b634..c85695965 100644 --- a/oracle/pkg/l1Listener/l1Listener_test.go +++ b/oracle/pkg/l1Listener/l1Listener_test.go @@ -205,6 +205,10 @@ func (t *testEthClient) HeaderByNumber(_ context.Context, number *big.Int) (*typ return hdr, nil } +func (t *testEthClient) BlockByNumber(_ context.Context, number *big.Int) (*types.Block, error) { + return nil, nil +} + func publishLog( eventManager events.EventManager, blockNum *big.Int, diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 3cb94cd70..af7dd8003 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -106,7 +106,7 @@ func NewNode(opts *Options) (*Node, error) { monitor := txmonitor.New( owner, settlementClient, - txmonitor.NewEVMHelper(settlementClient.Client()), + txmonitor.NewEVMHelperWithLogger(settlementClient.Client(), nd.logger), st, nd.logger.With("component", "tx_monitor"), 1024, @@ -164,6 +164,8 @@ func NewNode(opts *Options) (*Node, error) { listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode} } + listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger} + blockTracker, err := blocktracker.NewBlocktrackerTransactor( opts.BlockTrackerContractAddr, settlementRPC, @@ -232,10 +234,11 @@ func NewNode(opts *Options) (*Node, error) { updtr, err := updater.NewUpdater( nd.logger.With("component", "updater"), - l1Client, + listenerL1Client, st, evtMgr, oracleTransactorSession, + txmonitor.NewEVMHelperWithLogger(l1Client.Client(), nd.logger), ) if err != nil { nd.logger.Error("failed to instantiate updater", "error", err) @@ -403,6 +406,61 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big return hdr, nil } +type infiniteRetryL1Client struct { + l1Listener.EthClient + logger *slog.Logger +} + +func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) { + var blkNum uint64 + var err error + for retries := 50; retries > 0; retries-- { + blkNum, err = i.EthClient.BlockNumber(ctx) + if err == nil { + break + } + i.logger.Error("failed to get block number, retrying...", "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + return 0, err + } + return blkNum, nil +} + +func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var hdr *types.Header + var err error + for retries := 50; retries > 0; retries-- { + hdr, err = i.EthClient.HeaderByNumber(ctx, number) + if err == nil { + break + } + i.logger.Error("failed to get header by number, retrying...", "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + return nil, err + } + return hdr, nil +} + +func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + var blk *types.Block + var err error + for retries := 50; retries > 0; retries-- { + blk, err = i.EthClient.BlockByNumber(ctx, number) + if err == nil { + break + } + i.logger.Error("failed to get block by number, retrying...", "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + return nil, err + } + return blk, nil +} func setBuilderMapping( ctx context.Context, bt *blocktracker.BlocktrackerTransactorSession, diff --git a/oracle/pkg/updater/metrics.go b/oracle/pkg/updater/metrics.go index ab1b8d9b7..46363c5b5 100644 --- a/oracle/pkg/updater/metrics.go +++ b/oracle/pkg/updater/metrics.go @@ -8,19 +8,21 @@ const ( ) type metrics struct { - CommitmentsReceivedCount prometheus.Counter - CommitmentsProcessedCount prometheus.Counter - CommitmentsTooOldCount prometheus.Counter - DuplicateCommitmentsCount prometheus.Counter - RewardsCount prometheus.Counter - SlashesCount prometheus.Counter - EncryptedCommitmentsCount prometheus.Counter - NoWinnerCount prometheus.Counter - BlockTxnCacheHits prometheus.Counter - BlockTxnCacheMisses prometheus.Counter - BlockTimeCacheHits prometheus.Counter - BlockTimeCacheMisses prometheus.Counter - LastSentNonce prometheus.Gauge + CommitmentsReceivedCount prometheus.Counter + CommitmentsProcessedCount prometheus.Counter + CommitmentsTooOldCount prometheus.Counter + DuplicateCommitmentsCount prometheus.Counter + RewardsCount prometheus.Counter + SlashesCount prometheus.Counter + EncryptedCommitmentsCount prometheus.Counter + NoWinnerCount prometheus.Counter + BlockTxnCacheHits prometheus.Counter + BlockTxnCacheMisses prometheus.Counter + BlockTimeCacheHits prometheus.Counter + BlockTimeCacheMisses prometheus.Counter + LastSentNonce prometheus.Gauge + TxnReceiptRequestDuration prometheus.Histogram + TxnReceiptRequestBlockDuration prometheus.Histogram } func newMetrics() *metrics { @@ -129,6 +131,22 @@ func newMetrics() *metrics { Help: "Last nonce sent to for settlement", }, ) + m.TxnReceiptRequestDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: defaultNamespace, + Subsystem: subsystem, + Name: "txn_receipt_request_duration", + Help: "Duration of transaction receipt requests", + }, + ) + m.TxnReceiptRequestBlockDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: defaultNamespace, + Subsystem: subsystem, + Name: "txn_receipt_request_block_duration", + Help: "Duration of transaction receipt requests", + }, + ) return m } @@ -147,5 +165,7 @@ func (m *metrics) Collectors() []prometheus.Collector { m.BlockTimeCacheHits, m.BlockTimeCacheMisses, m.LastSentNonce, + m.TxnReceiptRequestDuration, + m.TxnReceiptRequestBlockDuration, } } diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 55a8eb855..1f1ae92e5 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -9,7 +9,9 @@ import ( "math" "math/big" "strings" + "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -18,12 +20,18 @@ import ( blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore" "github.com/primev/mev-commit/x/contracts/events" + "github.com/primev/mev-commit/x/contracts/txmonitor" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) type SettlementType string +type TxMetadata struct { + PosInBlock int + Succeeded bool +} + const ( SettlementTypeReward SettlementType = "reward" SettlementTypeSlash SettlementType = "slash" @@ -92,11 +100,12 @@ type Updater struct { winnerRegister WinnerRegister oracle Oracle evtMgr events.EventManager - l1BlockCache *lru.Cache[uint64, map[string]int] + l1BlockCache *lru.Cache[uint64, map[string]TxMetadata] encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored currentWindow atomic.Int64 metrics *metrics + receiptBatcher txmonitor.BatchReceiptGetter } func NewUpdater( @@ -105,8 +114,9 @@ func NewUpdater( winnerRegister WinnerRegister, evtMgr events.EventManager, oracle Oracle, + receiptBatcher txmonitor.BatchReceiptGetter, ) (*Updater, error) { - l1BlockCache, err := lru.New[uint64, map[string]int](1024) + l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024) if err != nil { return nil, fmt.Errorf("failed to create L1 block cache: %w", err) } @@ -117,6 +127,7 @@ func NewUpdater( winnerRegister: winnerRegister, evtMgr: evtMgr, oracle: oracle, + receiptBatcher: receiptBatcher, metrics: newMetrics(), openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored), encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored), @@ -205,7 +216,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { go func() { defer close(doneChan) if err := eg.Wait(); err != nil { - u.logger.Error("failed to start updater", "error", err) + u.logger.Error("updater failed, exiting", "error", err) + panic(err) } }() @@ -316,7 +328,6 @@ func (u *Updater) handleOpenedCommitment( ) return err } - // Compute the decay percentage decayPercentage := u.computeDecayPercentage( update.DecayStartTimeStamp, @@ -327,17 +338,19 @@ func (u *Updater) handleOpenedCommitment( commitmentTxnHashes := strings.Split(update.TxnHash, ",") // Ensure Bundle is atomic and present in the block for i := 0; i < len(commitmentTxnHashes); i++ { - posInBlock, found := txns[commitmentTxnHashes[i]] - if !found || posInBlock != txns[commitmentTxnHashes[0]]+i { + txnDetails, found := txns[commitmentTxnHashes[i]] + if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded { u.logger.Info( - "bundle is not atomic", + "bundle does not satsify commited requirements", "commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]), "txnHash", update.TxnHash, "blockNumber", update.BlockNumber, "found", found, - "posInBlock", posInBlock, - "expectedPosInBlock", txns[commitmentTxnHashes[0]]+i, + "posInBlock", txnDetails.PosInBlock, + "succeeded", txnDetails.Succeeded, + "expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i, ) + // The committer did not include the transactions in the block // correctly, so this is a slash to be processed return u.settle( @@ -450,28 +463,95 @@ func (u *Updater) addSettlement( return nil } - -func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]int, error) { +func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) { txns, ok := u.l1BlockCache.Get(blockNum) if ok { u.metrics.BlockTxnCacheHits.Inc() + u.logger.Info("cache hit for block transactions", "blockNum", blockNum) return txns, nil } u.metrics.BlockTxnCacheMisses.Inc() + u.logger.Info("cache miss for block transactions", "blockNum", blockNum) - blk, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) + block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) if err != nil { + u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err) return nil, fmt.Errorf("failed to get block by number: %w", err) } - txnsInBlock := make(map[string]int) - for posInBlock, tx := range blk.Transactions() { - txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock + u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex()) + + var txnReceipts sync.Map + eg, ctx := errgroup.WithContext(ctx) + + txnsArray := make([]common.Hash, len(block.Transactions())) + for i, tx := range block.Transactions() { + txnsArray[i] = tx.Hash() } - _ = u.l1BlockCache.Add(blockNum, txnsInBlock) + const bucketSize = 25 // Arbitrary number for bucket size + + numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up + buckets := make([][]common.Hash, numBuckets) + for i := 0; i < numBuckets; i++ { + start := i * bucketSize + end := start + bucketSize + if end > len(txnsArray) { + end = len(txnsArray) + } + buckets[i] = txnsArray[start:end] + } + + blockStart := time.Now() + + for _, bucket := range buckets { + eg.Go(func() error { + start := time.Now() + u.logger.Info("requesting batch receipts", "bucketSize", len(bucket)) + results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) + if err != nil { + u.logger.Error("failed to get batch receipts", "error", err) + return fmt.Errorf("failed to get batch receipts: %w", err) + } + u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds()) + u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds()) + for _, result := range results { + if result.Err != nil { + u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err) + return fmt.Errorf("failed to get receipt for txn: %s", result.Err) + } + + txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt) + u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex()) + } + + return nil + }) + } + + if err := eg.Wait(); err != nil { + u.logger.Error("error while waiting for batch receipts", "error", err) + return nil, err + } + + u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds()) + u.logger.Info("completed batch receipt requests for block", "blockNum", blockNum, "duration", time.Since(blockStart).Seconds()) + + txnsMap := make(map[string]TxMetadata) + for i, tx := range txnsArray { + receipt, ok := txnReceipts.Load(tx.Hex()) + if !ok { + u.logger.Error("receipt not found for txn", "txnHash", tx.Hex()) + return nil, fmt.Errorf("receipt not found for txn: %s", tx) + } + txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful} + u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful) + } + + _ = u.l1BlockCache.Add(blockNum, txnsMap) + u.logger.Info("added block transactions to cache", "blockNum", blockNum) - return txnsInBlock, nil + return txnsMap, nil } // computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index 566cf867f..12176098d 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -22,6 +22,7 @@ import ( preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore" "github.com/primev/mev-commit/oracle/pkg/updater" "github.com/primev/mev-commit/x/contracts/events" + "github.com/primev/mev-commit/x/contracts/txmonitor" "github.com/primev/mev-commit/x/util" "golang.org/x/crypto/sha3" ) @@ -32,6 +33,28 @@ func getIdxBytes(idx int64) [32]byte { return idxBytes } +type testBatcher struct { + failedReceipts map[common.Hash]bool +} + +func (t *testBatcher) BatchReceipts(ctx context.Context, txns []common.Hash) ([]txmonitor.Result, error) { + var results []txmonitor.Result + for _, txn := range txns { + status := types.ReceiptStatusSuccessful + if t.failedReceipts[txn] { + status = types.ReceiptStatusFailed + } + results = append(results, txmonitor.Result{ + Receipt: &types.Receipt{ + TxHash: txn, + Status: status, + }, + Err: nil, + }) + } + return results, nil +} + type testHasher struct { hasher hash.Hash } @@ -76,7 +99,7 @@ func TestUpdater(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -124,7 +147,7 @@ func TestUpdater(t *testing.T) { } // constructing bundles - for i := 0; i < 10; i++ { + for i := range 10 { idxBytes := getIdxBytes(int64(i + 10)) bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") @@ -173,6 +196,14 @@ func TestUpdater(t *testing.T) { blocks: map[int64]*types.Block{ 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) @@ -201,6 +232,7 @@ func TestUpdater(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) @@ -310,6 +342,273 @@ func TestUpdater(t *testing.T) { } } +func TestUpdaterRevertedTxns(t *testing.T) { + t.Parallel() + + // timestamp of the First block commitment is X + startTimestamp := time.UnixMilli(1615195200000) + midTimestamp := startTimestamp.Add(time.Duration(2.5 * float64(time.Second))) + endTimestamp := startTimestamp.Add(5 * time.Second) + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + + builderAddr := common.HexToAddress("0xabcd") + otherBuilderAddr := common.HexToAddress("0xabdc") + + signer := types.NewLondonSigner(big.NewInt(5)) + var txns []*types.Transaction + for i := range 10 { + txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ + Nonce: uint64(i + 1), + Gas: 1000000, + Value: big.NewInt(1), + GasTipCap: big.NewInt(500), + GasFeeCap: big.NewInt(500), + })) + } + + encCommitments := make([]preconf.PreconfcommitmentstoreEncryptedCommitmentStored, 0) + commitments := make([]preconf.PreconfcommitmentstoreCommitmentStored, 0) + + for i, txn := range txns { + idxBytes := getIdxBytes(int64(i)) + + encCommitment := preconf.PreconfcommitmentstoreEncryptedCommitmentStored{ + CommitmentIndex: idxBytes, + CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + commitment := preconf.PreconfcommitmentstoreCommitmentStored{ + CommitmentIndex: idxBytes, + TxnHash: strings.TrimPrefix(txn.Hash().Hex(), "0x"), + Bid: big.NewInt(10), + BlockNumber: 5, + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), + DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + + if i%2 == 0 { + encCommitment.Commiter = builderAddr + commitment.Commiter = builderAddr + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } else { + encCommitment.Commiter = otherBuilderAddr + commitment.Commiter = otherBuilderAddr + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } + } + + // constructing bundles + for i := range 10 { + idxBytes := getIdxBytes(int64(i + 10)) + + bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") + for j := i + 1; j < 10; j++ { + bundle += "," + strings.TrimPrefix(txns[j].Hash().Hex(), "0x") + } + + encCommitment := preconf.PreconfcommitmentstoreEncryptedCommitmentStored{ + CommitmentIndex: idxBytes, + Commiter: builderAddr, + CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + commitment := preconf.PreconfcommitmentstoreCommitmentStored{ + CommitmentIndex: idxBytes, + Commiter: builderAddr, + Bid: big.NewInt(10), + TxnHash: bundle, + BlockNumber: 5, + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), + DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } + + register := &testWinnerRegister{ + winners: []testWinner{ + { + blockNum: 5, + winner: updater.Winner{ + Winner: builderAddr.Bytes(), + Window: 1, + }, + }, + }, + settlements: make(chan testSettlement, 1), + encCommit: make(chan testEncryptedCommitment, 1), + } + + l1Client := &testEVMClient{ + blocks: map[int64]*types.Block{ + 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), + }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusFailed, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt + } + + pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) + if err != nil { + t.Fatal(err) + } + + btABI, err := abi.JSON(strings.NewReader(blocktracker.BlocktrackerABI)) + if err != nil { + t.Fatal(err) + } + + evtMgr := events.NewListener( + util.NewTestLogger(io.Discard), + &btABI, + &pcABI, + ) + + oracle := &testOracle{ + commitments: make(chan processedCommitment, 1), + } + testBatcher := &testBatcher{ + failedReceipts: make(map[common.Hash]bool), + } + for _, txn := range txns { + testBatcher.failedReceipts[txn.Hash()] = true + } + + updtr, err := updater.NewUpdater( + slog.New(slog.NewTextHandler(io.Discard, nil)), + l1Client, + register, + evtMgr, + oracle, + testBatcher, + ) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := updtr.Start(ctx) + + w := blocktracker.BlocktrackerNewWindow{ + Window: big.NewInt(1), + } + publishNewWindow(evtMgr, &btABI, w) + + for _, ec := range encCommitments { + if err := publishEncCommitment(evtMgr, &pcABI, ec); err != nil { + t.Fatal(err) + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case enc := <-register.encCommit: + if !bytes.Equal(enc.commitmentIdx, ec.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if !bytes.Equal(enc.committer, ec.Commiter.Bytes()) { + t.Fatal("wrong committer") + } + if !bytes.Equal(enc.commitmentHash, ec.CommitmentDigest[:]) { + t.Fatal("wrong commitment hash") + } + if !bytes.Equal(enc.commitmentSignature, ec.CommitmentSignature) { + t.Fatal("wrong commitment signature") + } + if enc.dispatchTimestamp != ec.DispatchTimestamp { + t.Fatal("wrong dispatch timestamp") + } + } + } + + for _, c := range commitments { + if err := publishCommitment(evtMgr, &pcABI, c); err != nil { + t.Fatal(err) + } + + if c.Commiter.Cmp(otherBuilderAddr) == 0 { + continue + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case commitment := <-oracle.commitments: + if !bytes.Equal(commitment.commitmentIdx[:], c.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if commitment.blockNum.Cmp(big.NewInt(5)) != 0 { + t.Fatal("wrong block number") + } + if commitment.builder != c.Commiter { + t.Fatal("wrong builder") + } + if !commitment.isSlash { + t.Fatal("wrong isSlash") + } + if commitment.residualDecay.Cmp(big.NewInt(50)) != 0 { + t.Fatal("wrong residual decay") + } + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case settlement := <-register.settlements: + if !bytes.Equal(settlement.commitmentIdx, c.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if settlement.txHash != c.TxnHash { + t.Fatal("wrong txn hash") + } + if settlement.blockNum != 5 { + t.Fatal("wrong block number") + } + if !bytes.Equal(settlement.builder, c.Commiter.Bytes()) { + t.Fatal("wrong builder") + } + if settlement.amount.Uint64() != 10 { + t.Fatal("wrong amount") + } + if settlement.settlementType != updater.SettlementTypeSlash { + t.Fatal("wrong settlement type") + } + if settlement.decayPercentage != 50 { + t.Fatal("wrong decay percentage") + } + if settlement.window != 1 { + t.Fatal("wrong window") + } + } + } + + cancel() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } +} + func TestUpdaterBundlesFailure(t *testing.T) { t.Parallel() @@ -326,7 +625,7 @@ func TestUpdaterBundlesFailure(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -380,6 +679,14 @@ func TestUpdaterBundlesFailure(t *testing.T) { blocks: map[int64]*types.Block{ 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } oracle := &testOracle{ @@ -408,6 +715,7 @@ func TestUpdaterBundlesFailure(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) @@ -503,7 +811,7 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -576,6 +884,14 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { 8: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), 10: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) @@ -604,6 +920,7 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) @@ -798,7 +1115,8 @@ func (t *testWinnerRegister) AddEncryptedCommitment( } type testEVMClient struct { - blocks map[int64]*types.Block + blocks map[int64]*types.Block + receipts map[string]*types.Receipt } func (t *testEVMClient) BlockByNumber(ctx context.Context, blkNum *big.Int) (*types.Block, error) { diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index 7500c5727..51d0286c3 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -185,7 +185,7 @@ func NewNode(opts *Options) (*Node, error) { monitor := txmonitor.New( opts.KeySigner.GetAddress(), contractRPC, - txmonitor.NewEVMHelper(contractRPC.Client()), + txmonitor.NewEVMHelperWithLogger(contractRPC.Client(), opts.Logger.With("component", "txmonitor")), store, opts.Logger.With("component", "txmonitor"), 1024, @@ -337,7 +337,7 @@ func NewNode(opts *Options) (*Node, error) { evtMgr, store, commitmentDA, - txmonitor.NewEVMHelper(contractRPC.Client()), + txmonitor.NewEVMHelperWithLogger(contractRPC.Client(), opts.Logger.With("component", "evm_helper")), optsGetter, opts.Logger.With("component", "tracker"), ) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index ca15db555..4826c08b8 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -2,6 +2,8 @@ package txmonitor import ( "context" + "log/slog" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -76,11 +78,11 @@ type BatchReceiptGetter interface { type evmHelper struct { client *rpc.Client + logger *slog.Logger } -// NewEVMHelper creates a new EVMHelper instance. -func NewEVMHelper(client *rpc.Client) *evmHelper { - return &evmHelper{client} +func NewEVMHelperWithLogger(client *rpc.Client, logger *slog.Logger) *evmHelper { + return &evmHelper{client, logger} } // TraceTransaction implements Debugger.TraceTransaction interface. @@ -101,9 +103,11 @@ func (e *evmHelper) TraceTransaction(ctx context.Context, txHash common.Hash) (* // BatchReceipts retrieves multiple receipts for a list of transaction hashes. func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) { + e.logger.Info("Starting BatchReceipts", "txHashes", txHashes) batch := make([]rpc.BatchElem, len(txHashes)) for i, hash := range txHashes { + e.logger.Debug("Preparing batch element", "index", i, "hash", hash.Hex()) batch[i] = rpc.BatchElem{ Method: "eth_getTransactionReceipt", Args: []interface{}{hash}, @@ -111,19 +115,34 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( } } - // Execute the batch request - err := e.client.BatchCallContext(ctx, batch) + var receipts []Result + var err error + for attempts := 0; attempts < 50; attempts++ { + e.logger.Debug("Attempting batch call", "attempt", attempts+1) + // Execute the batch request + err = e.client.BatchCallContext(context.Background(), batch) + if err != nil { + e.logger.Error("Batch call attempt failed", "attempt", attempts+1, "error", err) + time.Sleep(1 * time.Second) + } else { + e.logger.Info("Batch call attempt succeeded", "attempt", attempts+1) + break + } + } + if err != nil { + e.logger.Error("All batch call attempts failed", "error", err) return nil, err } - - receipts := make([]Result, len(batch)) + receipts = make([]Result, len(batch)) for i, elem := range batch { + e.logger.Debug("Processing batch element", "index", i, "result", elem.Result, "error", elem.Error) receipts[i].Receipt = elem.Result.(*types.Receipt) if elem.Error != nil { receipts[i].Err = elem.Error } } + e.logger.Info("BatchReceipts completed successfully", "receipts", receipts) return receipts, nil }