Skip to content

Commit

Permalink
feat: process txns sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
ckartik committed Jul 3, 2024
1 parent 4bcf910 commit 99f6032
Showing 1 changed file with 18 additions and 42 deletions.
60 changes: 18 additions & 42 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -477,63 +476,40 @@ func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]Tx
return nil, fmt.Errorf("failed to get block by number: %w", err)
}

var txnReceipts sync.Map
eg, ctx := errgroup.WithContext(ctx)

txnsMap := make(map[string]TxMetadata)
txnsArray := make([]common.Hash, len(block.Transactions()))
for i, tx := range block.Transactions() {
txnsArray[i] = tx.Hash()
}
const bucketSize = 50 // Arbitrary number for bucket size

numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up
buckets := make([][]common.Hash, numBuckets)
for i := 0; i < numBuckets; i++ {
start := i * bucketSize
end := start + bucketSize
blockStart := time.Now()

const batchSize = 20
for start := 0; start < len(txnsArray); start += batchSize {
end := start + batchSize
if end > len(txnsArray) {
end = len(txnsArray)
}
buckets[i] = txnsArray[start:end]
}

blockStart := time.Now()

for _, bucket := range buckets {
eg.Go(func() error {
start := time.Now()
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
if err != nil {
return fmt.Errorf("failed to get batch receipts: %w", err)
results, err := u.receiptBatcher.BatchReceipts(ctx, txnsArray[start:end])
if err != nil {
return nil, fmt.Errorf("failed to get batch receipts: %w", err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(blockStart).Seconds())
for i, result := range results {
if result.Err != nil {
return nil, fmt.Errorf("failed to get receipt for txn: %s", result.Err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
for _, result := range results {
if result.Err != nil {
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
}

txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
txnsMap[strings.TrimPrefix(result.Receipt.TxHash.Hex(), "0x")] = TxMetadata{
PosInBlock: start + i,
Succeeded: result.Receipt.Status == types.ReceiptStatusSuccessful,
}

return nil
})
}

if err := eg.Wait(); err != nil {
return nil, err
}
}

u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds())

txnsMap := make(map[string]TxMetadata)
for i, tx := range txnsArray {
receipt, ok := txnReceipts.Load(tx.Hex())
if !ok {
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
}
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
}

_ = u.l1BlockCache.Add(blockNum, txnsMap)

return txnsMap, nil
Expand Down

0 comments on commit 99f6032

Please sign in to comment.