From 921d4c837c329a3b317372491113ece23d17e86f Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 15:17:56 +0200 Subject: [PATCH 01/32] feat: adds more txn metadata to block cache --- oracle/pkg/updater/updater.go | 33 +++++++++++++++++++++--------- oracle/pkg/updater/updater_test.go | 4 ++++ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 55a8eb855..29a09d7f3 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -24,6 +24,12 @@ import ( type SettlementType string +type TxMetadata struct { + PosInBlock int + Succeeded bool + // Add other metadata fields here as needed +} + const ( SettlementTypeReward SettlementType = "reward" SettlementTypeSlash SettlementType = "slash" @@ -84,6 +90,7 @@ type Oracle interface { type EVMClient interface { BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) } type Updater struct { @@ -92,7 +99,7 @@ type Updater struct { winnerRegister WinnerRegister oracle Oracle evtMgr events.EventManager - l1BlockCache *lru.Cache[uint64, map[string]int] + l1BlockCache *lru.Cache[uint64, map[string]TxMetadata] encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored currentWindow atomic.Int64 @@ -106,7 +113,7 @@ func NewUpdater( evtMgr events.EventManager, oracle Oracle, ) (*Updater, error) { - l1BlockCache, err := lru.New[uint64, map[string]int](1024) + l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024) if err != nil { return nil, fmt.Errorf("failed to create L1 block cache: %w", err) } @@ -327,16 +334,17 @@ func (u *Updater) handleOpenedCommitment( commitmentTxnHashes := strings.Split(update.TxnHash, ",") // Ensure Bundle is atomic and present in the block for i := 0; i < len(commitmentTxnHashes); i++ { - posInBlock, found := txns[commitmentTxnHashes[i]] - if !found || posInBlock != txns[commitmentTxnHashes[0]]+i { + txnDetails, found := txns[commitmentTxnHashes[i]] + if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i { u.logger.Info( "bundle is not atomic", "commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]), "txnHash", update.TxnHash, "blockNumber", update.BlockNumber, "found", found, - "posInBlock", posInBlock, - "expectedPosInBlock", txns[commitmentTxnHashes[0]]+i, + "posInBlock", txnDetails.PosInBlock, + "succeeded", txnDetails.Succeeded, + "expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i, ) // The committer did not include the transactions in the block // correctly, so this is a slash to be processed @@ -450,8 +458,7 @@ func (u *Updater) addSettlement( return nil } - -func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]int, error) { +func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) { txns, ok := u.l1BlockCache.Get(blockNum) if ok { u.metrics.BlockTxnCacheHits.Inc() @@ -465,9 +472,15 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]in return nil, fmt.Errorf("failed to get block by number: %w", err) } - txnsInBlock := make(map[string]int) + txnsInBlock := make(map[string]TxMetadata) for posInBlock, tx := range blk.Transactions() { - txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock + receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) + return nil, fmt.Errorf("failed to get transaction receipt: %w", err) + } + txSucceeded := receipt.Status == 1 + txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} } _ = u.l1BlockCache.Add(blockNum, txnsInBlock) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index 566cf867f..aea31ea62 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -809,6 +809,10 @@ func (t *testEVMClient) BlockByNumber(ctx context.Context, blkNum *big.Int) (*ty return blk, nil } +func (t *testEVMClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + return &types.Receipt{Status: 1}, nil +} + type processedCommitment struct { commitmentIdx [32]byte blockNum *big.Int From eed7508abc17fb2499f18b6bc2270697913a4a3f Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 15:19:27 +0200 Subject: [PATCH 02/32] feat: slash on non-successful transactions --- oracle/pkg/updater/updater.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 29a09d7f3..15dca19ae 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -335,7 +335,7 @@ func (u *Updater) handleOpenedCommitment( // Ensure Bundle is atomic and present in the block for i := 0; i < len(commitmentTxnHashes); i++ { txnDetails, found := txns[commitmentTxnHashes[i]] - if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i { + if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded { u.logger.Info( "bundle is not atomic", "commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]), From 08eb0fa0553097f8dd940f145ab3ec0f9e2b83a5 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 15:20:17 +0200 Subject: [PATCH 03/32] chore: remove redundant log --- oracle/pkg/updater/updater.go | 1 - 1 file changed, 1 deletion(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 15dca19ae..2376e367c 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -27,7 +27,6 @@ type SettlementType string type TxMetadata struct { PosInBlock int Succeeded bool - // Add other metadata fields here as needed } const ( From 52cd04608ee2c2971159a847e6bc30603d872dbe Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 15:32:52 +0200 Subject: [PATCH 04/32] feat: request receipts concurrently --- oracle/pkg/updater/updater.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 2376e367c..e42cc5bf1 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -9,6 +9,7 @@ import ( "math" "math/big" "strings" + "sync" "sync/atomic" "github.com/ethereum/go-ethereum/common" @@ -472,15 +473,24 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx } txnsInBlock := make(map[string]TxMetadata) + var wg sync.WaitGroup + var mu sync.Mutex for posInBlock, tx := range blk.Transactions() { - receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) - return nil, fmt.Errorf("failed to get transaction receipt: %w", err) - } - txSucceeded := receipt.Status == 1 - txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} + wg.Add(1) + go func(posInBlock int, tx *types.Transaction) { + defer wg.Done() + receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) + return + } + txSucceeded := receipt.Status == 1 + mu.Lock() + txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} + mu.Unlock() + }(posInBlock, tx) } + wg.Wait() _ = u.l1BlockCache.Add(blockNum, txnsInBlock) return txnsInBlock, nil From ccf1ff1465cf5acad3534fed47a1dd0e24d3b715 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 15:38:06 +0200 Subject: [PATCH 05/32] feat: cleanup code --- oracle/pkg/updater/updater.go | 40 +++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index e42cc5bf1..bdded527a 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -458,6 +458,7 @@ func (u *Updater) addSettlement( return nil } + func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) { txns, ok := u.l1BlockCache.Get(blockNum) if ok { @@ -467,7 +468,7 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx u.metrics.BlockTxnCacheMisses.Inc() - blk, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) + block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) if err != nil { return nil, fmt.Errorf("failed to get block by number: %w", err) } @@ -475,22 +476,35 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx txnsInBlock := make(map[string]TxMetadata) var wg sync.WaitGroup var mu sync.Mutex - for posInBlock, tx := range blk.Transactions() { - wg.Add(1) - go func(posInBlock int, tx *types.Transaction) { - defer wg.Done() - receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) - return - } - txSucceeded := receipt.Status == 1 + var receiptErr error + + processTransactionMetadata := func(posInBlock int, tx *types.Transaction) { + defer wg.Done() + receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) mu.Lock() - txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} + receiptErr = err mu.Unlock() - }(posInBlock, tx) + return + } + txSucceeded := receipt.Status == 1 + mu.Lock() + txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} + mu.Unlock() + } + + for posInBlock, tx := range block.Transactions() { + wg.Add(1) + go processTransactionMetadata(posInBlock, tx) } + wg.Wait() + + if receiptErr != nil { + return nil, receiptErr + } + _ = u.l1BlockCache.Add(blockNum, txnsInBlock) return txnsInBlock, nil From 4565a4207ab5ae1feb13c93cf761088e9be94637 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 16:25:15 +0200 Subject: [PATCH 06/32] feat: adds txn receipts to tests --- oracle/pkg/updater/updater_test.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index aea31ea62..1faed8368 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -576,6 +576,14 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { 8: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), 10: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) @@ -798,7 +806,8 @@ func (t *testWinnerRegister) AddEncryptedCommitment( } type testEVMClient struct { - blocks map[int64]*types.Block + blocks map[int64]*types.Block + receipts map[string]*types.Receipt } func (t *testEVMClient) BlockByNumber(ctx context.Context, blkNum *big.Int) (*types.Block, error) { @@ -810,7 +819,11 @@ func (t *testEVMClient) BlockByNumber(ctx context.Context, blkNum *big.Int) (*ty } func (t *testEVMClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - return &types.Receipt{Status: 1}, nil + receipt, found := t.receipts[txHash.Hex()] + if !found { + return nil, fmt.Errorf("receipt for transaction hash %s not found", txHash.Hex()) + } + return receipt, nil } type processedCommitment struct { From 22e4f89a976f8494a507c3a72506e3aa492c9136 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 16:38:02 +0200 Subject: [PATCH 07/32] chore: updates test stub to include txn receipts --- oracle/pkg/updater/updater_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index 1faed8368..e837676ba 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -173,6 +173,14 @@ func TestUpdater(t *testing.T) { blocks: map[int64]*types.Block{ 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) @@ -380,6 +388,14 @@ func TestUpdaterBundlesFailure(t *testing.T) { blocks: map[int64]*types.Block{ 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt } oracle := &testOracle{ From e80a568b8467de9e6e7480b8dfb2c1c9b7e58fda Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 20 Jun 2024 16:43:52 +0200 Subject: [PATCH 08/32] feat: adds a revert check in tests --- oracle/pkg/updater/updater_test.go | 260 +++++++++++++++++++++++++++++ 1 file changed, 260 insertions(+) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index e837676ba..c9ec3f736 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -318,6 +318,266 @@ func TestUpdater(t *testing.T) { } } +func TestUpdaterRevertedTxns(t *testing.T) { + t.Parallel() + + // timestamp of the First block commitment is X + startTimestamp := time.UnixMilli(1615195200000) + midTimestamp := startTimestamp.Add(time.Duration(2.5 * float64(time.Second))) + endTimestamp := startTimestamp.Add(5 * time.Second) + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + + builderAddr := common.HexToAddress("0xabcd") + otherBuilderAddr := common.HexToAddress("0xabdc") + + signer := types.NewLondonSigner(big.NewInt(5)) + var txns []*types.Transaction + for i := 0; i < 10; i++ { + txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ + Nonce: uint64(i + 1), + Gas: 1000000, + Value: big.NewInt(1), + GasTipCap: big.NewInt(500), + GasFeeCap: big.NewInt(500), + })) + } + + encCommitments := make([]preconf.PreconfcommitmentstoreEncryptedCommitmentStored, 0) + commitments := make([]preconf.PreconfcommitmentstoreCommitmentStored, 0) + + for i, txn := range txns { + idxBytes := getIdxBytes(int64(i)) + + encCommitment := preconf.PreconfcommitmentstoreEncryptedCommitmentStored{ + CommitmentIndex: idxBytes, + CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + commitment := preconf.PreconfcommitmentstoreCommitmentStored{ + CommitmentIndex: idxBytes, + TxnHash: strings.TrimPrefix(txn.Hash().Hex(), "0x"), + Bid: big.NewInt(10), + BlockNumber: 5, + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), + DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + + if i%2 == 0 { + encCommitment.Commiter = builderAddr + commitment.Commiter = builderAddr + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } else { + encCommitment.Commiter = otherBuilderAddr + commitment.Commiter = otherBuilderAddr + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } + } + + // constructing bundles + for i := 0; i < 10; i++ { + idxBytes := getIdxBytes(int64(i + 10)) + + bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") + for j := i + 1; j < 10; j++ { + bundle += "," + strings.TrimPrefix(txns[j].Hash().Hex(), "0x") + } + + encCommitment := preconf.PreconfcommitmentstoreEncryptedCommitmentStored{ + CommitmentIndex: idxBytes, + Commiter: builderAddr, + CommitmentDigest: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + commitment := preconf.PreconfcommitmentstoreCommitmentStored{ + CommitmentIndex: idxBytes, + Commiter: builderAddr, + Bid: big.NewInt(10), + TxnHash: bundle, + BlockNumber: 5, + CommitmentHash: common.HexToHash(fmt.Sprintf("0x%02d", i)), + CommitmentSignature: []byte("signature"), + DecayStartTimeStamp: uint64(startTimestamp.UnixMilli()), + DecayEndTimeStamp: uint64(endTimestamp.UnixMilli()), + DispatchTimestamp: uint64(midTimestamp.UnixMilli()), + } + encCommitments = append(encCommitments, encCommitment) + commitments = append(commitments, commitment) + } + + register := &testWinnerRegister{ + winners: []testWinner{ + { + blockNum: 5, + winner: updater.Winner{ + Winner: builderAddr.Bytes(), + Window: 1, + }, + }, + }, + settlements: make(chan testSettlement, 1), + encCommit: make(chan testEncryptedCommitment, 1), + } + + l1Client := &testEVMClient{ + blocks: map[int64]*types.Block{ + 5: types.NewBlock(&types.Header{}, txns, nil, nil, NewHasher()), + }, + receipts: make(map[string]*types.Receipt), + } + for _, txn := range txns { + receipt := &types.Receipt{ + Status: types.ReceiptStatusFailed, + TxHash: txn.Hash(), + } + l1Client.receipts[txn.Hash().Hex()] = receipt + } + + pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfcommitmentstoreABI)) + if err != nil { + t.Fatal(err) + } + + btABI, err := abi.JSON(strings.NewReader(blocktracker.BlocktrackerABI)) + if err != nil { + t.Fatal(err) + } + + evtMgr := events.NewListener( + util.NewTestLogger(io.Discard), + &btABI, + &pcABI, + ) + + oracle := &testOracle{ + commitments: make(chan processedCommitment, 1), + } + + updtr, err := updater.NewUpdater( + slog.New(slog.NewTextHandler(io.Discard, nil)), + l1Client, + register, + evtMgr, + oracle, + ) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := updtr.Start(ctx) + + w := blocktracker.BlocktrackerNewWindow{ + Window: big.NewInt(1), + } + publishNewWindow(evtMgr, &btABI, w) + + for _, ec := range encCommitments { + if err := publishEncCommitment(evtMgr, &pcABI, ec); err != nil { + t.Fatal(err) + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case enc := <-register.encCommit: + if !bytes.Equal(enc.commitmentIdx, ec.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if !bytes.Equal(enc.committer, ec.Commiter.Bytes()) { + t.Fatal("wrong committer") + } + if !bytes.Equal(enc.commitmentHash, ec.CommitmentDigest[:]) { + t.Fatal("wrong commitment hash") + } + if !bytes.Equal(enc.commitmentSignature, ec.CommitmentSignature) { + t.Fatal("wrong commitment signature") + } + if enc.dispatchTimestamp != ec.DispatchTimestamp { + t.Fatal("wrong dispatch timestamp") + } + } + } + + for _, c := range commitments { + if err := publishCommitment(evtMgr, &pcABI, c); err != nil { + t.Fatal(err) + } + + if c.Commiter.Cmp(otherBuilderAddr) == 0 { + continue + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case commitment := <-oracle.commitments: + if !bytes.Equal(commitment.commitmentIdx[:], c.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if commitment.blockNum.Cmp(big.NewInt(5)) != 0 { + t.Fatal("wrong block number") + } + if commitment.builder != c.Commiter { + t.Fatal("wrong builder") + } + if !commitment.isSlash { + t.Fatal("wrong isSlash") + } + if commitment.residualDecay.Cmp(big.NewInt(50)) != 0 { + t.Fatal("wrong residual decay") + } + } + + select { + case <-time.After(5 * time.Second): + t.Fatal("timeout") + case settlement := <-register.settlements: + if !bytes.Equal(settlement.commitmentIdx, c.CommitmentIndex[:]) { + t.Fatal("wrong commitment index") + } + if settlement.txHash != c.TxnHash { + t.Fatal("wrong txn hash") + } + if settlement.blockNum != 5 { + t.Fatal("wrong block number") + } + if !bytes.Equal(settlement.builder, c.Commiter.Bytes()) { + t.Fatal("wrong builder") + } + if settlement.amount.Uint64() != 10 { + t.Fatal("wrong amount") + } + if settlement.settlementType != updater.SettlementTypeSlash { + t.Fatal("wrong settlement type") + } + if settlement.decayPercentage != 50 { + t.Fatal("wrong decay percentage") + } + if settlement.window != 1 { + t.Fatal("wrong window") + } + } + } + + cancel() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } +} + func TestUpdaterBundlesFailure(t *testing.T) { t.Parallel() From a8bee61f835e5f17e03f5353135b1744cee24300 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Mon, 1 Jul 2024 15:33:20 -0400 Subject: [PATCH 09/32] feat: use errgroup and syncmap --- oracle/pkg/updater/updater.go | 42 +++++++++++++++++------------------ 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index bdded527a..0d19a0dfe 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -472,42 +472,40 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx if err != nil { return nil, fmt.Errorf("failed to get block by number: %w", err) } + var txnsInBlock sync.Map + eg, ctx := errgroup.WithContext(ctx) - txnsInBlock := make(map[string]TxMetadata) - var wg sync.WaitGroup - var mu sync.Mutex - var receiptErr error - - processTransactionMetadata := func(posInBlock int, tx *types.Transaction) { - defer wg.Done() + processTransactionMetadata := func(posInBlock int, tx *types.Transaction) error { receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) if err != nil { u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) - mu.Lock() - receiptErr = err - mu.Unlock() - return + return err } txSucceeded := receipt.Status == 1 - mu.Lock() - txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded} - mu.Unlock() + txnsInBlock.Store(strings.TrimPrefix(tx.Hash().Hex(), "0x"), TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded}) + return nil } for posInBlock, tx := range block.Transactions() { - wg.Add(1) - go processTransactionMetadata(posInBlock, tx) + posInBlock, tx := posInBlock, tx // capture loop variables + eg.Go(func() error { + return processTransactionMetadata(posInBlock, tx) + }) } - wg.Wait() - - if receiptErr != nil { - return nil, receiptErr + if err := eg.Wait(); err != nil { + return nil, err } - _ = u.l1BlockCache.Add(blockNum, txnsInBlock) + txnsMap := make(map[string]TxMetadata) + txnsInBlock.Range(func(key, value interface{}) bool { + txnsMap[key.(string)] = value.(TxMetadata) + return true + }) + + _ = u.l1BlockCache.Add(blockNum, txnsMap) - return txnsInBlock, nil + return txnsMap, nil } // computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage From af546ea1a54a14766ba78a7025d6186044f3ba5e Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Mon, 1 Jul 2024 18:25:37 -0400 Subject: [PATCH 10/32] feat: introduces batching --- oracle/pkg/node/node.go | 1 + oracle/pkg/updater/updater.go | 61 +++++++++++++++++++++--------- oracle/pkg/updater/updater_test.go | 33 ++++++++++++++++ 3 files changed, 77 insertions(+), 18 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 3cb94cd70..74b0b2a9f 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -236,6 +236,7 @@ func NewNode(opts *Options) (*Node, error) { st, evtMgr, oracleTransactorSession, + txmonitor.NewEVMHelper(l1Client.Client()), ) if err != nil { nd.logger.Error("failed to instantiate updater", "error", err) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 0d19a0dfe..0ebd63cff 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -19,6 +19,7 @@ import ( blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore" "github.com/primev/mev-commit/x/contracts/events" + "github.com/primev/mev-commit/x/contracts/txmonitor" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -104,6 +105,7 @@ type Updater struct { openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored currentWindow atomic.Int64 metrics *metrics + receiptBatcher txmonitor.BatchReceiptGetter } func NewUpdater( @@ -112,6 +114,7 @@ func NewUpdater( winnerRegister WinnerRegister, evtMgr events.EventManager, oracle Oracle, + receiptBatcher txmonitor.BatchReceiptGetter, ) (*Updater, error) { l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024) if err != nil { @@ -124,6 +127,7 @@ func NewUpdater( winnerRegister: winnerRegister, evtMgr: evtMgr, oracle: oracle, + receiptBatcher: receiptBatcher, metrics: newMetrics(), openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored), encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored), @@ -323,7 +327,6 @@ func (u *Updater) handleOpenedCommitment( ) return err } - // Compute the decay percentage decayPercentage := u.computeDecayPercentage( update.DecayStartTimeStamp, @@ -337,7 +340,7 @@ func (u *Updater) handleOpenedCommitment( txnDetails, found := txns[commitmentTxnHashes[i]] if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded { u.logger.Info( - "bundle is not atomic", + "bundle does not satsify commited requirements", "commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]), "txnHash", update.TxnHash, "blockNumber", update.BlockNumber, @@ -346,6 +349,7 @@ func (u *Updater) handleOpenedCommitment( "succeeded", txnDetails.Succeeded, "expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i, ) + // The committer did not include the transactions in the block // correctly, so this is a slash to be processed return u.settle( @@ -472,24 +476,42 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx if err != nil { return nil, fmt.Errorf("failed to get block by number: %w", err) } - var txnsInBlock sync.Map + + var txnReceipts sync.Map eg, ctx := errgroup.WithContext(ctx) - processTransactionMetadata := func(posInBlock int, tx *types.Transaction) error { - receipt, err := u.l1Client.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - u.logger.Error("failed to get transaction receipt", "txHash", tx.Hash().Hex(), "error", err) - return err + txnsArray := make([]common.Hash, len(block.Transactions())) + for i, tx := range block.Transactions() { + txnsArray[i] = tx.Hash() + } + + bucketSize := (len(txnsArray) + 3) / 4 // Calculate the size of each bucket, rounding up + buckets := make([][]common.Hash, 4) + for i := 0; i < 4; i++ { + start := i * bucketSize + end := start + bucketSize + if end > len(txnsArray) { + end = len(txnsArray) } - txSucceeded := receipt.Status == 1 - txnsInBlock.Store(strings.TrimPrefix(tx.Hash().Hex(), "0x"), TxMetadata{PosInBlock: posInBlock, Succeeded: txSucceeded}) - return nil + buckets[i] = txnsArray[start:end] } - for posInBlock, tx := range block.Transactions() { - posInBlock, tx := posInBlock, tx // capture loop variables + for _, bucket := range buckets { + bucket := bucket // closure for each errorgroup eg.Go(func() error { - return processTransactionMetadata(posInBlock, tx) + results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) + if err != nil { + return fmt.Errorf("failed to get batch receipts: %w", err) + } + for _, result := range results { + if result.Err != nil { + return fmt.Errorf("failed to get receipt for txn: %s", result.Err) + } + + txnReceipts.Store(result.Receipt.TxHash.Hex(), *result.Receipt) + } + + return nil }) } @@ -498,10 +520,13 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx } txnsMap := make(map[string]TxMetadata) - txnsInBlock.Range(func(key, value interface{}) bool { - txnsMap[key.(string)] = value.(TxMetadata) - return true - }) + for i, tx := range txnsArray { + receipt, ok := txnReceipts.Load(tx.Hex()) + if !ok { + return nil, fmt.Errorf("receipt not found for txn: %s", tx) + } + txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(types.Receipt).Status == types.ReceiptStatusSuccessful} + } _ = u.l1BlockCache.Add(blockNum, txnsMap) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index c9ec3f736..2db12e209 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -22,6 +22,7 @@ import ( preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore" "github.com/primev/mev-commit/oracle/pkg/updater" "github.com/primev/mev-commit/x/contracts/events" + "github.com/primev/mev-commit/x/contracts/txmonitor" "github.com/primev/mev-commit/x/util" "golang.org/x/crypto/sha3" ) @@ -32,6 +33,28 @@ func getIdxBytes(idx int64) [32]byte { return idxBytes } +type testBatcher struct { + failedReceipts map[common.Hash]bool +} + +func (t *testBatcher) BatchReceipts(ctx context.Context, txns []common.Hash) ([]txmonitor.Result, error) { + var results []txmonitor.Result + for _, txn := range txns { + status := types.ReceiptStatusSuccessful + if t.failedReceipts[txn] { + status = types.ReceiptStatusFailed + } + results = append(results, txmonitor.Result{ + Receipt: &types.Receipt{ + TxHash: txn, + Status: status, + }, + Err: nil, + }) + } + return results, nil +} + type testHasher struct { hasher hash.Hash } @@ -209,6 +232,7 @@ func TestUpdater(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) @@ -462,6 +486,12 @@ func TestUpdaterRevertedTxns(t *testing.T) { oracle := &testOracle{ commitments: make(chan processedCommitment, 1), } + testBatcher := &testBatcher{ + failedReceipts: make(map[common.Hash]bool), + } + for _, txn := range txns { + testBatcher.failedReceipts[txn.Hash()] = true + } updtr, err := updater.NewUpdater( slog.New(slog.NewTextHandler(io.Discard, nil)), @@ -469,6 +499,7 @@ func TestUpdaterRevertedTxns(t *testing.T) { register, evtMgr, oracle, + testBatcher, ) if err != nil { t.Fatal(err) @@ -684,6 +715,7 @@ func TestUpdaterBundlesFailure(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) @@ -888,6 +920,7 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { register, evtMgr, oracle, + &testBatcher{}, ) if err != nil { t.Fatal(err) From 43bfaa1e492988b6c3ed3f6f38bc27ab5cb42bdc Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 13:45:43 -0400 Subject: [PATCH 11/32] feat: rework bucketing to minimize requests --- oracle/pkg/updater/updater.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 0ebd63cff..d26fa2146 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -484,10 +484,11 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx for i, tx := range block.Transactions() { txnsArray[i] = tx.Hash() } + const bucketSize = 50 // Arbitrary number for bucket size - bucketSize := (len(txnsArray) + 3) / 4 // Calculate the size of each bucket, rounding up - buckets := make([][]common.Hash, 4) - for i := 0; i < 4; i++ { + numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up + buckets := make([][]common.Hash, numBuckets) + for i := 0; i < numBuckets; i++ { start := i * bucketSize end := start + bucketSize if end > len(txnsArray) { From 8fed5fb0aee77ad05eb0dacce8b286a350340173 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 13:49:06 -0400 Subject: [PATCH 12/32] chore: resolves nit PR requests --- oracle/pkg/updater/updater.go | 5 ++--- oracle/pkg/updater/updater_test.go | 8 -------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index d26fa2146..2f8d8903d 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -91,7 +91,6 @@ type Oracle interface { type EVMClient interface { BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) - TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) } type Updater struct { @@ -509,7 +508,7 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx return fmt.Errorf("failed to get receipt for txn: %s", result.Err) } - txnReceipts.Store(result.Receipt.TxHash.Hex(), *result.Receipt) + txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt) } return nil @@ -526,7 +525,7 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx if !ok { return nil, fmt.Errorf("receipt not found for txn: %s", tx) } - txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(types.Receipt).Status == types.ReceiptStatusSuccessful} + txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful} } _ = u.l1BlockCache.Add(blockNum, txnsMap) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index 2db12e209..2bf40311a 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -1127,14 +1127,6 @@ func (t *testEVMClient) BlockByNumber(ctx context.Context, blkNum *big.Int) (*ty return blk, nil } -func (t *testEVMClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - receipt, found := t.receipts[txHash.Hex()] - if !found { - return nil, fmt.Errorf("receipt for transaction hash %s not found", txHash.Hex()) - } - return receipt, nil -} - type processedCommitment struct { commitmentIdx [32]byte blockNum *big.Int From dcd89300e7f39ef2d925680e8f9c207af5393ea2 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 16:09:27 -0400 Subject: [PATCH 13/32] chore: removes closure var --- oracle/pkg/updater/updater.go | 1 - 1 file changed, 1 deletion(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 2f8d8903d..aedc0d864 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -497,7 +497,6 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx } for _, bucket := range buckets { - bucket := bucket // closure for each errorgroup eg.Go(func() error { results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) if err != nil { From a55e1f9c9380514c466cbe0b7ea487238d31eae1 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 16:13:26 -0400 Subject: [PATCH 14/32] chore: use range over integer --- oracle/pkg/updater/updater_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/oracle/pkg/updater/updater_test.go b/oracle/pkg/updater/updater_test.go index 2bf40311a..12176098d 100644 --- a/oracle/pkg/updater/updater_test.go +++ b/oracle/pkg/updater/updater_test.go @@ -99,7 +99,7 @@ func TestUpdater(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -147,7 +147,7 @@ func TestUpdater(t *testing.T) { } // constructing bundles - for i := 0; i < 10; i++ { + for i := range 10 { idxBytes := getIdxBytes(int64(i + 10)) bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") @@ -360,7 +360,7 @@ func TestUpdaterRevertedTxns(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -408,7 +408,7 @@ func TestUpdaterRevertedTxns(t *testing.T) { } // constructing bundles - for i := 0; i < 10; i++ { + for i := range 10 { idxBytes := getIdxBytes(int64(i + 10)) bundle := strings.TrimPrefix(txns[i].Hash().Hex(), "0x") @@ -625,7 +625,7 @@ func TestUpdaterBundlesFailure(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, @@ -811,7 +811,7 @@ func TestUpdaterIgnoreCommitments(t *testing.T) { signer := types.NewLondonSigner(big.NewInt(5)) var txns []*types.Transaction - for i := 0; i < 10; i++ { + for i := range 10 { txns = append(txns, types.MustSignNewTx(key, signer, &types.DynamicFeeTx{ Nonce: uint64(i + 1), Gas: 1000000, From d9fb69a6559c6dea45689e14edd1eca83780fd0a Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 16:24:06 -0400 Subject: [PATCH 15/32] feat: adds metrics --- oracle/pkg/updater/metrics.go | 9 +++++++++ oracle/pkg/updater/updater.go | 3 +++ 2 files changed, 12 insertions(+) diff --git a/oracle/pkg/updater/metrics.go b/oracle/pkg/updater/metrics.go index ab1b8d9b7..418e3986d 100644 --- a/oracle/pkg/updater/metrics.go +++ b/oracle/pkg/updater/metrics.go @@ -21,6 +21,7 @@ type metrics struct { BlockTimeCacheHits prometheus.Counter BlockTimeCacheMisses prometheus.Counter LastSentNonce prometheus.Gauge + TxnReceiptRequestDuration prometheus.Histogram } func newMetrics() *metrics { @@ -129,6 +130,14 @@ func newMetrics() *metrics { Help: "Last nonce sent to for settlement", }, ) + m.TxnReceiptRequestDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: defaultNamespace, + Subsystem: subsystem, + Name: "txn_receipt_request_duration", + Help: "Duration of transaction receipt requests", + }, + ) return m } diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index aedc0d864..893e18e3a 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -498,10 +499,12 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx for _, bucket := range buckets { eg.Go(func() error { + start := time.Now() results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) if err != nil { return fmt.Errorf("failed to get batch receipts: %w", err) } + u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds()) for _, result := range results { if result.Err != nil { return fmt.Errorf("failed to get receipt for txn: %s", result.Err) From bee4cb954181a327f496990793c3914c49b4a7c7 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Tue, 2 Jul 2024 16:54:41 -0400 Subject: [PATCH 16/32] chore: adds metric to collector --- oracle/pkg/updater/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/oracle/pkg/updater/metrics.go b/oracle/pkg/updater/metrics.go index 418e3986d..ff3557655 100644 --- a/oracle/pkg/updater/metrics.go +++ b/oracle/pkg/updater/metrics.go @@ -156,5 +156,6 @@ func (m *metrics) Collectors() []prometheus.Collector { m.BlockTimeCacheHits, m.BlockTimeCacheMisses, m.LastSentNonce, + m.TxnReceiptRequestDuration, } } From 9da2f044dd2d187c67741da9b3d70ca9e9d177a2 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Wed, 3 Jul 2024 13:05:56 -0400 Subject: [PATCH 17/32] feat: get total duration for block receipts --- oracle/pkg/updater/metrics.go | 38 ++++++++++++++++++++++------------- oracle/pkg/updater/updater.go | 4 ++++ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/oracle/pkg/updater/metrics.go b/oracle/pkg/updater/metrics.go index ff3557655..46363c5b5 100644 --- a/oracle/pkg/updater/metrics.go +++ b/oracle/pkg/updater/metrics.go @@ -8,20 +8,21 @@ const ( ) type metrics struct { - CommitmentsReceivedCount prometheus.Counter - CommitmentsProcessedCount prometheus.Counter - CommitmentsTooOldCount prometheus.Counter - DuplicateCommitmentsCount prometheus.Counter - RewardsCount prometheus.Counter - SlashesCount prometheus.Counter - EncryptedCommitmentsCount prometheus.Counter - NoWinnerCount prometheus.Counter - BlockTxnCacheHits prometheus.Counter - BlockTxnCacheMisses prometheus.Counter - BlockTimeCacheHits prometheus.Counter - BlockTimeCacheMisses prometheus.Counter - LastSentNonce prometheus.Gauge - TxnReceiptRequestDuration prometheus.Histogram + CommitmentsReceivedCount prometheus.Counter + CommitmentsProcessedCount prometheus.Counter + CommitmentsTooOldCount prometheus.Counter + DuplicateCommitmentsCount prometheus.Counter + RewardsCount prometheus.Counter + SlashesCount prometheus.Counter + EncryptedCommitmentsCount prometheus.Counter + NoWinnerCount prometheus.Counter + BlockTxnCacheHits prometheus.Counter + BlockTxnCacheMisses prometheus.Counter + BlockTimeCacheHits prometheus.Counter + BlockTimeCacheMisses prometheus.Counter + LastSentNonce prometheus.Gauge + TxnReceiptRequestDuration prometheus.Histogram + TxnReceiptRequestBlockDuration prometheus.Histogram } func newMetrics() *metrics { @@ -138,6 +139,14 @@ func newMetrics() *metrics { Help: "Duration of transaction receipt requests", }, ) + m.TxnReceiptRequestBlockDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: defaultNamespace, + Subsystem: subsystem, + Name: "txn_receipt_request_block_duration", + Help: "Duration of transaction receipt requests", + }, + ) return m } @@ -157,5 +166,6 @@ func (m *metrics) Collectors() []prometheus.Collector { m.BlockTimeCacheMisses, m.LastSentNonce, m.TxnReceiptRequestDuration, + m.TxnReceiptRequestBlockDuration, } } diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 893e18e3a..3ab8cbd7c 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -497,6 +497,8 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx buckets[i] = txnsArray[start:end] } + blockStart := time.Now() + for _, bucket := range buckets { eg.Go(func() error { start := time.Now() @@ -521,6 +523,8 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx return nil, err } + u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds()) + txnsMap := make(map[string]TxMetadata) for i, tx := range txnsArray { receipt, ok := txnReceipts.Load(tx.Hex()) From ff3e6bb177129fd17a92403ebafd97fb33c8cafe Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Wed, 3 Jul 2024 14:47:33 -0400 Subject: [PATCH 18/32] feat: adds retry to BatchReceipts call --- x/contracts/txmonitor/eth_helper.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index ca15db555..77113b3c4 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -2,6 +2,7 @@ package txmonitor import ( "context" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -111,13 +112,29 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( } } - // Execute the batch request - err := e.client.BatchCallContext(ctx, batch) + var receipts []Result + var err error + for retries := 0; retries < 3; retries++ { + // Execute the batch request + err = e.client.BatchCallContext(ctx, batch) + if err == nil { + break + } + + // Check if the error is a 429 (Too Many Requests) + if rpcErr, ok := err.(rpc.Error); ok && rpcErr.ErrorCode() == 429 { + time.Sleep(1 * time.Second) + continue + } + + return nil, err + } + if err != nil { return nil, err } - receipts := make([]Result, len(batch)) + receipts = make([]Result, len(batch)) for i, elem := range batch { receipts[i].Receipt = elem.Result.(*types.Receipt) if elem.Error != nil { From c33d14f71fac884eaa4f2735db1172c76793b192 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Wed, 3 Jul 2024 15:11:46 -0400 Subject: [PATCH 19/32] feat: adds retries to all calls --- oracle/pkg/l1Listener/l1Listener.go | 1 + oracle/pkg/l1Listener/l1Listener_test.go | 4 ++ oracle/pkg/node/node.go | 51 +++++++++++++++++++++++- x/contracts/txmonitor/eth_helper.go | 2 +- 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/oracle/pkg/l1Listener/l1Listener.go b/oracle/pkg/l1Listener/l1Listener.go index b8f3d0e71..e590afb2c 100644 --- a/oracle/pkg/l1Listener/l1Listener.go +++ b/oracle/pkg/l1Listener/l1Listener.go @@ -27,6 +27,7 @@ type WinnerRegister interface { type EthClient interface { BlockNumber(ctx context.Context) (uint64, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) } type L1Listener struct { diff --git a/oracle/pkg/l1Listener/l1Listener_test.go b/oracle/pkg/l1Listener/l1Listener_test.go index db9e7b634..c85695965 100644 --- a/oracle/pkg/l1Listener/l1Listener_test.go +++ b/oracle/pkg/l1Listener/l1Listener_test.go @@ -205,6 +205,10 @@ func (t *testEthClient) HeaderByNumber(_ context.Context, number *big.Int) (*typ return hdr, nil } +func (t *testEthClient) BlockByNumber(_ context.Context, number *big.Int) (*types.Block, error) { + return nil, nil +} + func publishLog( eventManager events.EventManager, blockNum *big.Int, diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 74b0b2a9f..11a84b34d 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -164,6 +164,8 @@ func NewNode(opts *Options) (*Node, error) { listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode} } + listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger} + blockTracker, err := blocktracker.NewBlocktrackerTransactor( opts.BlockTrackerContractAddr, settlementRPC, @@ -232,7 +234,7 @@ func NewNode(opts *Options) (*Node, error) { updtr, err := updater.NewUpdater( nd.logger.With("component", "updater"), - l1Client, + listenerL1Client, st, evtMgr, oracleTransactorSession, @@ -404,6 +406,53 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big return hdr, nil } +type infiniteRetryL1Client struct { + l1Listener.EthClient + logger *slog.Logger +} + +func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) { + var blkNum uint64 + var err error + for { + blkNum, err = i.EthClient.BlockNumber(ctx) + if err == nil { + break + } + i.logger.Error("failed to get block number, retrying...", "error", err) + time.Sleep(1 * time.Second) + } + return blkNum, nil +} + +func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var hdr *types.Header + var err error + for { + hdr, err = i.EthClient.HeaderByNumber(ctx, number) + if err == nil { + break + } + i.logger.Error("failed to get header by number, retrying...", "error", err) + time.Sleep(1 * time.Second) + } + return hdr, nil +} + +func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + var blk *types.Block + var err error + for { + blk, err = i.EthClient.BlockByNumber(ctx, number) + if err == nil { + break + } + i.logger.Error("failed to get block by number, retrying...", "error", err) + time.Sleep(1 * time.Second) + } + return blk, nil +} + func setBuilderMapping( ctx context.Context, bt *blocktracker.BlocktrackerTransactorSession, diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index 77113b3c4..0d4c84ed4 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -114,7 +114,7 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var receipts []Result var err error - for retries := 0; retries < 3; retries++ { + for retries := 0; retries < 100; retries++ { // Execute the batch request err = e.client.BatchCallContext(ctx, batch) if err == nil { From 1fec832eb38ae7869a94fd12feac7b58ef90a63b Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 13:21:21 -0400 Subject: [PATCH 20/32] feat: check for 429 in infinite retry --- oracle/pkg/node/node.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 11a84b34d..5994a3c4f 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -419,7 +419,12 @@ func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) if err == nil { break } - i.logger.Error("failed to get block number, retrying...", "error", err) + if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) + } else { + i.logger.Error("failed to get block number", "error", err) + return 0, err + } time.Sleep(1 * time.Second) } return blkNum, nil @@ -433,7 +438,12 @@ func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big. if err == nil { break } - i.logger.Error("failed to get header by number, retrying...", "error", err) + if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) + } else { + i.logger.Error("failed to get header by number", "error", err) + return nil, err + } time.Sleep(1 * time.Second) } return hdr, nil @@ -447,7 +457,12 @@ func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.I if err == nil { break } - i.logger.Error("failed to get block by number, retrying...", "error", err) + if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) + } else { + i.logger.Error("failed to get block by number", "error", err) + return nil, err + } time.Sleep(1 * time.Second) } return blk, nil From 02c1216889a8f7d7fbd7326f2a8ec5e59cc680fc Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 13:24:36 -0400 Subject: [PATCH 21/32] feat: do infinite retry on batch call and log --- x/contracts/txmonitor/eth_helper.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index 0d4c84ed4..51b050250 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -2,6 +2,7 @@ package txmonitor import ( "context" + "log" "time" "github.com/ethereum/go-ethereum/common" @@ -114,7 +115,7 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var receipts []Result var err error - for retries := 0; retries < 100; retries++ { + for { // Execute the batch request err = e.client.BatchCallContext(ctx, batch) if err == nil { @@ -123,6 +124,7 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( // Check if the error is a 429 (Too Many Requests) if rpcErr, ok := err.(rpc.Error); ok && rpcErr.ErrorCode() == 429 { + log.Println("received 429 Too Many Requests, retrying...", "error", err) time.Sleep(1 * time.Second) continue } @@ -130,10 +132,6 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( return nil, err } - if err != nil { - return nil, err - } - receipts = make([]Result, len(batch)) for i, elem := range batch { receipts[i].Receipt = elem.Result.(*types.Receipt) From 0c3e132336f0ba37d9f0a0f584d3ff676f2a3d35 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 13:55:08 -0400 Subject: [PATCH 22/32] feat: checks for 429 errors --- oracle/pkg/node/node.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 5994a3c4f..933c19728 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -419,7 +419,7 @@ func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) if err == nil { break } - if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + if strings.Contains(err.Error(), "429") { i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) } else { i.logger.Error("failed to get block number", "error", err) @@ -438,7 +438,7 @@ func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big. if err == nil { break } - if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + if strings.Contains(err.Error(), "429") { i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) } else { i.logger.Error("failed to get header by number", "error", err) @@ -457,7 +457,7 @@ func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.I if err == nil { break } - if httpErr, ok := err.(interface{ StatusCode() int }); ok && httpErr.StatusCode() == 429 { + if strings.Contains(err.Error(), "429") { i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) } else { i.logger.Error("failed to get block by number", "error", err) @@ -467,7 +467,6 @@ func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.I } return blk, nil } - func setBuilderMapping( ctx context.Context, bt *blocktracker.BlocktrackerTransactorSession, From e0d3ee013de642560c975ae9204897ec02ffeb71 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 14:47:50 -0400 Subject: [PATCH 23/32] feat: attempt the request multiple times --- x/contracts/txmonitor/eth_helper.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index 51b050250..5b1388ee8 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -115,23 +115,18 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var receipts []Result var err error - for { + for attempts := 0; attempts < 10; attempts++ { // Execute the batch request err = e.client.BatchCallContext(ctx, batch) - if err == nil { - break - } - - // Check if the error is a 429 (Too Many Requests) - if rpcErr, ok := err.(rpc.Error); ok && rpcErr.ErrorCode() == 429 { - log.Println("received 429 Too Many Requests, retrying...", "error", err) + if err != nil { + log.Printf("Batch call attempt %d failed: %v", attempts+1, err) time.Sleep(1 * time.Second) - continue } + } + if err != nil { return nil, err } - receipts = make([]Result, len(batch)) for i, elem := range batch { receipts[i].Receipt = elem.Result.(*types.Receipt) From 93d96b23738bb8f03f018cecf1b2f9b301f26c17 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 14:49:53 -0400 Subject: [PATCH 24/32] feat: attempt retries 50 times before failing --- oracle/pkg/node/node.go | 36 ++++++++++++----------------- x/contracts/txmonitor/eth_helper.go | 2 +- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 933c19728..fe964aed7 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -414,57 +414,51 @@ type infiniteRetryL1Client struct { func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) { var blkNum uint64 var err error - for { + for retries := 50; retries > 0; retries-- { blkNum, err = i.EthClient.BlockNumber(ctx) if err == nil { break } - if strings.Contains(err.Error(), "429") { - i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) - } else { - i.logger.Error("failed to get block number", "error", err) - return 0, err - } + i.logger.Error("failed to get block number, retrying...", "error", err) time.Sleep(1 * time.Second) } + if err != nil { + return 0, err + } return blkNum, nil } func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { var hdr *types.Header var err error - for { + for retries := 50; retries > 0; retries-- { hdr, err = i.EthClient.HeaderByNumber(ctx, number) if err == nil { break } - if strings.Contains(err.Error(), "429") { - i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) - } else { - i.logger.Error("failed to get header by number", "error", err) - return nil, err - } + i.logger.Error("failed to get header by number, retrying...", "error", err) time.Sleep(1 * time.Second) } + if err != nil { + return nil, err + } return hdr, nil } func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { var blk *types.Block var err error - for { + for retries := 50; retries > 0; retries-- { blk, err = i.EthClient.BlockByNumber(ctx, number) if err == nil { break } - if strings.Contains(err.Error(), "429") { - i.logger.Error("received 429 Too Many Requests, retrying...", "error", err) - } else { - i.logger.Error("failed to get block by number", "error", err) - return nil, err - } + i.logger.Error("failed to get block by number, retrying...", "error", err) time.Sleep(1 * time.Second) } + if err != nil { + return nil, err + } return blk, nil } func setBuilderMapping( diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index 5b1388ee8..b7b3fdd41 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -115,7 +115,7 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var receipts []Result var err error - for attempts := 0; attempts < 10; attempts++ { + for attempts := 0; attempts < 50; attempts++ { // Execute the batch request err = e.client.BatchCallContext(ctx, batch) if err != nil { From 9b61c0e701c1766c6c6cebdcfb135a3ce33e9f95 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 15:24:27 -0400 Subject: [PATCH 25/32] feat: reduce the size of batch --- oracle/pkg/node/node.go | 6 +++--- oracle/pkg/updater/updater.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index fe964aed7..a19aee727 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -420,7 +420,7 @@ func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) break } i.logger.Error("failed to get block number, retrying...", "error", err) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) } if err != nil { return 0, err @@ -437,7 +437,7 @@ func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big. break } i.logger.Error("failed to get header by number, retrying...", "error", err) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) } if err != nil { return nil, err @@ -454,7 +454,7 @@ func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.I break } i.logger.Error("failed to get block by number, retrying...", "error", err) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) } if err != nil { return nil, err diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 3ab8cbd7c..350ad0963 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -484,7 +484,7 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx for i, tx := range block.Transactions() { txnsArray[i] = tx.Hash() } - const bucketSize = 50 // Arbitrary number for bucket size + const bucketSize = 25 // Arbitrary number for bucket size numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up buckets := make([][]common.Hash, numBuckets) From 4f4ff58a70ef887eb9913830d43504de169985df Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 16:08:35 -0400 Subject: [PATCH 26/32] feat: adds personal L1 RPC URL --- infrastructure/nomad/playbooks/variables/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infrastructure/nomad/playbooks/variables/profiles.yml b/infrastructure/nomad/playbooks/variables/profiles.yml index d72307e58..ece0ee884 100644 --- a/infrastructure/nomad/playbooks/variables/profiles.yml +++ b/infrastructure/nomad/playbooks/variables/profiles.yml @@ -1,5 +1,5 @@ datacenter: "dc1" -l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/WqNEQeeexFLQwECjxCPpdep0uvCgn8Yj" +l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/H8JN1wImnEPrxkFRVOT7cJ_gzu9x3VmB" artifacts: bidder_emulator: &bidder_emulator_artifact From fcb78808c38e42813de468a56735fd0af617311c Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 18:47:15 -0400 Subject: [PATCH 27/32] chore: adds temp debug logs --- oracle/pkg/updater/updater.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 350ad0963..6b39f231b 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -462,21 +462,25 @@ func (u *Updater) addSettlement( return nil } - func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) { txns, ok := u.l1BlockCache.Get(blockNum) if ok { u.metrics.BlockTxnCacheHits.Inc() + u.logger.Info("cache hit for block transactions", "blockNum", blockNum) return txns, nil } u.metrics.BlockTxnCacheMisses.Inc() + u.logger.Info("cache miss for block transactions", "blockNum", blockNum) block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum)) if err != nil { + u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err) return nil, fmt.Errorf("failed to get block by number: %w", err) } + u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex()) + var txnReceipts sync.Map eg, ctx := errgroup.WithContext(ctx) @@ -502,17 +506,22 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx for _, bucket := range buckets { eg.Go(func() error { start := time.Now() + u.logger.Info("requesting batch receipts", "bucketSize", len(bucket)) results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) if err != nil { + u.logger.Error("failed to get batch receipts", "error", err) return fmt.Errorf("failed to get batch receipts: %w", err) } u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds()) + u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds()) for _, result := range results { if result.Err != nil { + u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err) return fmt.Errorf("failed to get receipt for txn: %s", result.Err) } txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt) + u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex()) } return nil @@ -520,21 +529,26 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx } if err := eg.Wait(); err != nil { + u.logger.Error("error while waiting for batch receipts", "error", err) return nil, err } u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds()) + u.logger.Info("completed batch receipt requests for block", "blockNum", blockNum, "duration", time.Since(blockStart).Seconds()) txnsMap := make(map[string]TxMetadata) for i, tx := range txnsArray { receipt, ok := txnReceipts.Load(tx.Hex()) if !ok { + u.logger.Error("receipt not found for txn", "txnHash", tx.Hex()) return nil, fmt.Errorf("receipt not found for txn: %s", tx) } txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful} + u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful) } _ = u.l1BlockCache.Add(blockNum, txnsMap) + u.logger.Info("added block transactions to cache", "blockNum", blockNum) return txnsMap, nil } From 36c218f9371051b55ae1bd83c3ecbbf3578f8ebe Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 19:22:52 -0400 Subject: [PATCH 28/32] chore: avoid context from cancelation --- x/contracts/txmonitor/eth_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index b7b3fdd41..d45016911 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -117,7 +117,7 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var err error for attempts := 0; attempts < 50; attempts++ { // Execute the batch request - err = e.client.BatchCallContext(ctx, batch) + err = e.client.BatchCallContext(context.Background(), batch) if err != nil { log.Printf("Batch call attempt %d failed: %v", attempts+1, err) time.Sleep(1 * time.Second) From 745ce299524160987f27dfd24266f7fe47094824 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 19:24:04 -0400 Subject: [PATCH 29/32] chore: panic when updater routine fails --- oracle/pkg/updater/updater.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 6b39f231b..1f1ae92e5 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -216,7 +216,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} { go func() { defer close(doneChan) if err := eg.Wait(); err != nil { - u.logger.Error("failed to start updater", "error", err) + u.logger.Error("updater failed, exiting", "error", err) + panic(err) } }() From fdcf9a61c4f7267bf07b7012385378dcae5482b7 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 20:54:34 -0400 Subject: [PATCH 30/32] feat: adds logger for evmhelper --- oracle/pkg/node/node.go | 4 ++-- x/contracts/txmonitor/eth_helper.go | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index a19aee727..af7dd8003 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -106,7 +106,7 @@ func NewNode(opts *Options) (*Node, error) { monitor := txmonitor.New( owner, settlementClient, - txmonitor.NewEVMHelper(settlementClient.Client()), + txmonitor.NewEVMHelperWithLogger(settlementClient.Client(), nd.logger), st, nd.logger.With("component", "tx_monitor"), 1024, @@ -238,7 +238,7 @@ func NewNode(opts *Options) (*Node, error) { st, evtMgr, oracleTransactorSession, - txmonitor.NewEVMHelper(l1Client.Client()), + txmonitor.NewEVMHelperWithLogger(l1Client.Client(), nd.logger), ) if err != nil { nd.logger.Error("failed to instantiate updater", "error", err) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index d45016911..8fe53c3bc 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -3,6 +3,7 @@ package txmonitor import ( "context" "log" + "log/slog" "time" "github.com/ethereum/go-ethereum/common" @@ -78,11 +79,11 @@ type BatchReceiptGetter interface { type evmHelper struct { client *rpc.Client + logger *slog.Logger } -// NewEVMHelper creates a new EVMHelper instance. -func NewEVMHelper(client *rpc.Client) *evmHelper { - return &evmHelper{client} +func NewEVMHelperWithLogger(client *rpc.Client, logger *slog.Logger) *evmHelper { + return &evmHelper{client, logger} } // TraceTransaction implements Debugger.TraceTransaction interface. From a220daa4e702db289b88284b0d2b17f1cd65e427 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Thu, 4 Jul 2024 20:56:16 -0400 Subject: [PATCH 31/32] feat: add more logs --- x/contracts/txmonitor/eth_helper.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/x/contracts/txmonitor/eth_helper.go b/x/contracts/txmonitor/eth_helper.go index 8fe53c3bc..4826c08b8 100644 --- a/x/contracts/txmonitor/eth_helper.go +++ b/x/contracts/txmonitor/eth_helper.go @@ -2,7 +2,6 @@ package txmonitor import ( "context" - "log" "log/slog" "time" @@ -104,9 +103,11 @@ func (e *evmHelper) TraceTransaction(ctx context.Context, txHash common.Hash) (* // BatchReceipts retrieves multiple receipts for a list of transaction hashes. func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ([]Result, error) { + e.logger.Info("Starting BatchReceipts", "txHashes", txHashes) batch := make([]rpc.BatchElem, len(txHashes)) for i, hash := range txHashes { + e.logger.Debug("Preparing batch element", "index", i, "hash", hash.Hex()) batch[i] = rpc.BatchElem{ Method: "eth_getTransactionReceipt", Args: []interface{}{hash}, @@ -117,24 +118,31 @@ func (e *evmHelper) BatchReceipts(ctx context.Context, txHashes []common.Hash) ( var receipts []Result var err error for attempts := 0; attempts < 50; attempts++ { + e.logger.Debug("Attempting batch call", "attempt", attempts+1) // Execute the batch request err = e.client.BatchCallContext(context.Background(), batch) if err != nil { - log.Printf("Batch call attempt %d failed: %v", attempts+1, err) + e.logger.Error("Batch call attempt failed", "attempt", attempts+1, "error", err) time.Sleep(1 * time.Second) + } else { + e.logger.Info("Batch call attempt succeeded", "attempt", attempts+1) + break } } if err != nil { + e.logger.Error("All batch call attempts failed", "error", err) return nil, err } receipts = make([]Result, len(batch)) for i, elem := range batch { + e.logger.Debug("Processing batch element", "index", i, "result", elem.Result, "error", elem.Error) receipts[i].Receipt = elem.Result.(*types.Receipt) if elem.Error != nil { receipts[i].Err = elem.Error } } + e.logger.Info("BatchReceipts completed successfully", "receipts", receipts) return receipts, nil } From 816dd33fcc7c542e6f588fe5454e89b1c6545684 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Fri, 5 Jul 2024 13:13:31 -0400 Subject: [PATCH 32/32] feat: fix linter issues --- p2p/pkg/node/node.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index 7500c5727..51d0286c3 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -185,7 +185,7 @@ func NewNode(opts *Options) (*Node, error) { monitor := txmonitor.New( opts.KeySigner.GetAddress(), contractRPC, - txmonitor.NewEVMHelper(contractRPC.Client()), + txmonitor.NewEVMHelperWithLogger(contractRPC.Client(), opts.Logger.With("component", "txmonitor")), store, opts.Logger.With("component", "txmonitor"), 1024, @@ -337,7 +337,7 @@ func NewNode(opts *Options) (*Node, error) { evtMgr, store, commitmentDA, - txmonitor.NewEVMHelper(contractRPC.Client()), + txmonitor.NewEVMHelperWithLogger(contractRPC.Client(), opts.Logger.With("component", "evm_helper")), optsGetter, opts.Logger.With("component", "tracker"), )