From 99f603266ce59e22d5897281972a278aa27015e3 Mon Sep 17 00:00:00 2001 From: Kartik Chopra Date: Wed, 3 Jul 2024 17:33:27 -0400 Subject: [PATCH] feat: process txns sequentially --- oracle/pkg/updater/updater.go | 60 +++++++++++------------------------ 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/oracle/pkg/updater/updater.go b/oracle/pkg/updater/updater.go index 3ab8cbd7c..bb9bb5503 100644 --- a/oracle/pkg/updater/updater.go +++ b/oracle/pkg/updater/updater.go @@ -9,7 +9,6 @@ import ( "math" "math/big" "strings" - "sync" "sync/atomic" "time" @@ -477,63 +476,40 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx return nil, fmt.Errorf("failed to get block by number: %w", err) } - var txnReceipts sync.Map - eg, ctx := errgroup.WithContext(ctx) - + txnsMap := make(map[string]TxMetadata) txnsArray := make([]common.Hash, len(block.Transactions())) for i, tx := range block.Transactions() { txnsArray[i] = tx.Hash() } - const bucketSize = 50 // Arbitrary number for bucket size - numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up - buckets := make([][]common.Hash, numBuckets) - for i := 0; i < numBuckets; i++ { - start := i * bucketSize - end := start + bucketSize + blockStart := time.Now() + + const batchSize = 20 + for start := 0; start < len(txnsArray); start += batchSize { + end := start + batchSize if end > len(txnsArray) { end = len(txnsArray) } - buckets[i] = txnsArray[start:end] - } - blockStart := time.Now() - - for _, bucket := range buckets { - eg.Go(func() error { - start := time.Now() - results, err := u.receiptBatcher.BatchReceipts(ctx, bucket) - if err != nil { - return fmt.Errorf("failed to get batch receipts: %w", err) + results, err := u.receiptBatcher.BatchReceipts(ctx, txnsArray[start:end]) + if err != nil { + return nil, fmt.Errorf("failed to get batch receipts: %w", err) + } + u.metrics.TxnReceiptRequestDuration.Observe(time.Since(blockStart).Seconds()) + for i, result := range results { + if result.Err != nil { + return nil, fmt.Errorf("failed to get receipt for txn: %s", result.Err) } - u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds()) - for _, result := range results { - if result.Err != nil { - return fmt.Errorf("failed to get receipt for txn: %s", result.Err) - } - txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt) + txnsMap[strings.TrimPrefix(result.Receipt.TxHash.Hex(), "0x")] = TxMetadata{ + PosInBlock: start + i, + Succeeded: result.Receipt.Status == types.ReceiptStatusSuccessful, } - - return nil - }) - } - - if err := eg.Wait(); err != nil { - return nil, err + } } u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds()) - txnsMap := make(map[string]TxMetadata) - for i, tx := range txnsArray { - receipt, ok := txnReceipts.Load(tx.Hex()) - if !ok { - return nil, fmt.Errorf("receipt not found for txn: %s", tx) - } - txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful} - } - _ = u.l1BlockCache.Add(blockNum, txnsMap) return txnsMap, nil