Skip to content

Commit

Permalink
feat: adds execution enforcement to preconfirmations (#161)
Browse files Browse the repository at this point in the history
* feat: adds more txn metadata to block cache

* feat: slash on non-successful transactions

* chore: remove redundant log

* feat: request receipts concurrently

* feat: cleanup code

* feat: adds txn receipts to tests

* chore: updates test stub to include txn receipts

* feat: adds a revert check in tests

* feat: use errgroup and syncmap

* feat: introduces batching

* feat: rework bucketing to minimize requests

* chore: resolves nit PR requests

* chore: removes closure var

* chore: use range over integer

* feat: adds metrics

* chore: adds metric to collector

* feat: get total duration for block receipts

* feat: adds retry to BatchReceipts call

* feat: adds retries to all calls

* feat: check for 429 in infinite retry

* feat: do infinite retry on batch call and log

* feat: checks for 429 errors

* feat: attempt the request multiple times

* feat: attempt retries 50 times before failing

* feat: reduce the size of batch

* feat: adds personal L1 RPC URL

* chore: adds temp debug logs

* chore: avoid context from cancelation

* chore: panic when updater routine fails

* feat: adds logger for evmhelper

* feat: add more logs

* feat: fix linter issues
  • Loading branch information
ckartik authored Jul 5, 2024
1 parent 10b6452 commit a6094d7
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 47 deletions.
2 changes: 1 addition & 1 deletion infrastructure/nomad/playbooks/variables/profiles.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
datacenter: "dc1"
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/WqNEQeeexFLQwECjxCPpdep0uvCgn8Yj"
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/H8JN1wImnEPrxkFRVOT7cJ_gzu9x3VmB"

artifacts:
bidder_emulator: &bidder_emulator_artifact
Expand Down
1 change: 1 addition & 0 deletions oracle/pkg/l1Listener/l1Listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type WinnerRegister interface {
type EthClient interface {
BlockNumber(ctx context.Context) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}

type L1Listener struct {
Expand Down
4 changes: 4 additions & 0 deletions oracle/pkg/l1Listener/l1Listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ func (t *testEthClient) HeaderByNumber(_ context.Context, number *big.Int) (*typ
return hdr, nil
}

func (t *testEthClient) BlockByNumber(_ context.Context, number *big.Int) (*types.Block, error) {
return nil, nil
}

func publishLog(
eventManager events.EventManager,
blockNum *big.Int,
Expand Down
62 changes: 60 additions & 2 deletions oracle/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewNode(opts *Options) (*Node, error) {
monitor := txmonitor.New(
owner,
settlementClient,
txmonitor.NewEVMHelper(settlementClient.Client()),
txmonitor.NewEVMHelperWithLogger(settlementClient.Client(), nd.logger),
st,
nd.logger.With("component", "tx_monitor"),
1024,
Expand Down Expand Up @@ -164,6 +164,8 @@ func NewNode(opts *Options) (*Node, error) {
listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode}
}

listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger}

blockTracker, err := blocktracker.NewBlocktrackerTransactor(
opts.BlockTrackerContractAddr,
settlementRPC,
Expand Down Expand Up @@ -232,10 +234,11 @@ func NewNode(opts *Options) (*Node, error) {

updtr, err := updater.NewUpdater(
nd.logger.With("component", "updater"),
l1Client,
listenerL1Client,
st,
evtMgr,
oracleTransactorSession,
txmonitor.NewEVMHelperWithLogger(l1Client.Client(), nd.logger),
)
if err != nil {
nd.logger.Error("failed to instantiate updater", "error", err)
Expand Down Expand Up @@ -403,6 +406,61 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big
return hdr, nil
}

type infiniteRetryL1Client struct {
l1Listener.EthClient
logger *slog.Logger
}

func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) {
var blkNum uint64
var err error
for retries := 50; retries > 0; retries-- {
blkNum, err = i.EthClient.BlockNumber(ctx)
if err == nil {
break
}
i.logger.Error("failed to get block number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return 0, err
}
return blkNum, nil
}

func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
var hdr *types.Header
var err error
for retries := 50; retries > 0; retries-- {
hdr, err = i.EthClient.HeaderByNumber(ctx, number)
if err == nil {
break
}
i.logger.Error("failed to get header by number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return nil, err
}
return hdr, nil
}

func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
var blk *types.Block
var err error
for retries := 50; retries > 0; retries-- {
blk, err = i.EthClient.BlockByNumber(ctx, number)
if err == nil {
break
}
i.logger.Error("failed to get block by number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return nil, err
}
return blk, nil
}
func setBuilderMapping(
ctx context.Context,
bt *blocktracker.BlocktrackerTransactorSession,
Expand Down
46 changes: 33 additions & 13 deletions oracle/pkg/updater/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ const (
)

type metrics struct {
CommitmentsReceivedCount prometheus.Counter
CommitmentsProcessedCount prometheus.Counter
CommitmentsTooOldCount prometheus.Counter
DuplicateCommitmentsCount prometheus.Counter
RewardsCount prometheus.Counter
SlashesCount prometheus.Counter
EncryptedCommitmentsCount prometheus.Counter
NoWinnerCount prometheus.Counter
BlockTxnCacheHits prometheus.Counter
BlockTxnCacheMisses prometheus.Counter
BlockTimeCacheHits prometheus.Counter
BlockTimeCacheMisses prometheus.Counter
LastSentNonce prometheus.Gauge
CommitmentsReceivedCount prometheus.Counter
CommitmentsProcessedCount prometheus.Counter
CommitmentsTooOldCount prometheus.Counter
DuplicateCommitmentsCount prometheus.Counter
RewardsCount prometheus.Counter
SlashesCount prometheus.Counter
EncryptedCommitmentsCount prometheus.Counter
NoWinnerCount prometheus.Counter
BlockTxnCacheHits prometheus.Counter
BlockTxnCacheMisses prometheus.Counter
BlockTimeCacheHits prometheus.Counter
BlockTimeCacheMisses prometheus.Counter
LastSentNonce prometheus.Gauge
TxnReceiptRequestDuration prometheus.Histogram
TxnReceiptRequestBlockDuration prometheus.Histogram
}

func newMetrics() *metrics {
Expand Down Expand Up @@ -129,6 +131,22 @@ func newMetrics() *metrics {
Help: "Last nonce sent to for settlement",
},
)
m.TxnReceiptRequestDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: defaultNamespace,
Subsystem: subsystem,
Name: "txn_receipt_request_duration",
Help: "Duration of transaction receipt requests",
},
)
m.TxnReceiptRequestBlockDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: defaultNamespace,
Subsystem: subsystem,
Name: "txn_receipt_request_block_duration",
Help: "Duration of transaction receipt requests",
},
)
return m
}

Expand All @@ -147,5 +165,7 @@ func (m *metrics) Collectors() []prometheus.Collector {
m.BlockTimeCacheHits,
m.BlockTimeCacheMisses,
m.LastSentNonce,
m.TxnReceiptRequestDuration,
m.TxnReceiptRequestBlockDuration,
}
}
114 changes: 97 additions & 17 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -18,12 +20,18 @@ import (
blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker"
preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore"
"github.com/primev/mev-commit/x/contracts/events"
"github.com/primev/mev-commit/x/contracts/txmonitor"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)

type SettlementType string

type TxMetadata struct {
PosInBlock int
Succeeded bool
}

const (
SettlementTypeReward SettlementType = "reward"
SettlementTypeSlash SettlementType = "slash"
Expand Down Expand Up @@ -92,11 +100,12 @@ type Updater struct {
winnerRegister WinnerRegister
oracle Oracle
evtMgr events.EventManager
l1BlockCache *lru.Cache[uint64, map[string]int]
l1BlockCache *lru.Cache[uint64, map[string]TxMetadata]
encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored
openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored
currentWindow atomic.Int64
metrics *metrics
receiptBatcher txmonitor.BatchReceiptGetter
}

func NewUpdater(
Expand All @@ -105,8 +114,9 @@ func NewUpdater(
winnerRegister WinnerRegister,
evtMgr events.EventManager,
oracle Oracle,
receiptBatcher txmonitor.BatchReceiptGetter,
) (*Updater, error) {
l1BlockCache, err := lru.New[uint64, map[string]int](1024)
l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024)
if err != nil {
return nil, fmt.Errorf("failed to create L1 block cache: %w", err)
}
Expand All @@ -117,6 +127,7 @@ func NewUpdater(
winnerRegister: winnerRegister,
evtMgr: evtMgr,
oracle: oracle,
receiptBatcher: receiptBatcher,
metrics: newMetrics(),
openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored),
encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored),
Expand Down Expand Up @@ -205,7 +216,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} {
go func() {
defer close(doneChan)
if err := eg.Wait(); err != nil {
u.logger.Error("failed to start updater", "error", err)
u.logger.Error("updater failed, exiting", "error", err)
panic(err)
}
}()

Expand Down Expand Up @@ -316,7 +328,6 @@ func (u *Updater) handleOpenedCommitment(
)
return err
}

// Compute the decay percentage
decayPercentage := u.computeDecayPercentage(
update.DecayStartTimeStamp,
Expand All @@ -327,17 +338,19 @@ func (u *Updater) handleOpenedCommitment(
commitmentTxnHashes := strings.Split(update.TxnHash, ",")
// Ensure Bundle is atomic and present in the block
for i := 0; i < len(commitmentTxnHashes); i++ {
posInBlock, found := txns[commitmentTxnHashes[i]]
if !found || posInBlock != txns[commitmentTxnHashes[0]]+i {
txnDetails, found := txns[commitmentTxnHashes[i]]
if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded {
u.logger.Info(
"bundle is not atomic",
"bundle does not satsify commited requirements",
"commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]),
"txnHash", update.TxnHash,
"blockNumber", update.BlockNumber,
"found", found,
"posInBlock", posInBlock,
"expectedPosInBlock", txns[commitmentTxnHashes[0]]+i,
"posInBlock", txnDetails.PosInBlock,
"succeeded", txnDetails.Succeeded,
"expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i,
)

// The committer did not include the transactions in the block
// correctly, so this is a slash to be processed
return u.settle(
Expand Down Expand Up @@ -450,28 +463,95 @@ func (u *Updater) addSettlement(

return nil
}

func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]int, error) {
func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) {
txns, ok := u.l1BlockCache.Get(blockNum)
if ok {
u.metrics.BlockTxnCacheHits.Inc()
u.logger.Info("cache hit for block transactions", "blockNum", blockNum)
return txns, nil
}

u.metrics.BlockTxnCacheMisses.Inc()
u.logger.Info("cache miss for block transactions", "blockNum", blockNum)

blk, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
if err != nil {
u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err)
return nil, fmt.Errorf("failed to get block by number: %w", err)
}

txnsInBlock := make(map[string]int)
for posInBlock, tx := range blk.Transactions() {
txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock
u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())

var txnReceipts sync.Map
eg, ctx := errgroup.WithContext(ctx)

txnsArray := make([]common.Hash, len(block.Transactions()))
for i, tx := range block.Transactions() {
txnsArray[i] = tx.Hash()
}
_ = u.l1BlockCache.Add(blockNum, txnsInBlock)
const bucketSize = 25 // Arbitrary number for bucket size

numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up
buckets := make([][]common.Hash, numBuckets)
for i := 0; i < numBuckets; i++ {
start := i * bucketSize
end := start + bucketSize
if end > len(txnsArray) {
end = len(txnsArray)
}
buckets[i] = txnsArray[start:end]
}

blockStart := time.Now()

for _, bucket := range buckets {
eg.Go(func() error {
start := time.Now()
u.logger.Info("requesting batch receipts", "bucketSize", len(bucket))
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
if err != nil {
u.logger.Error("failed to get batch receipts", "error", err)
return fmt.Errorf("failed to get batch receipts: %w", err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds())
for _, result := range results {
if result.Err != nil {
u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err)
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
}

txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
}

return nil
})
}

if err := eg.Wait(); err != nil {
u.logger.Error("error while waiting for batch receipts", "error", err)
return nil, err
}

u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds())
u.logger.Info("completed batch receipt requests for block", "blockNum", blockNum, "duration", time.Since(blockStart).Seconds())

txnsMap := make(map[string]TxMetadata)
for i, tx := range txnsArray {
receipt, ok := txnReceipts.Load(tx.Hex())
if !ok {
u.logger.Error("receipt not found for txn", "txnHash", tx.Hex())
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
}
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
}

_ = u.l1BlockCache.Add(blockNum, txnsMap)
u.logger.Info("added block transactions to cache", "blockNum", blockNum)

return txnsInBlock, nil
return txnsMap, nil
}

// computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage
Expand Down
Loading

0 comments on commit a6094d7

Please sign in to comment.