Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds execution enforcement to preconfirmations #161

Merged
merged 32 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
921d4c8
feat: adds more txn metadata to block cache
ckartik Jun 20, 2024
eed7508
feat: slash on non-successful transactions
ckartik Jun 20, 2024
08eb0fa
chore: remove redundant log
ckartik Jun 20, 2024
52cd046
feat: request receipts concurrently
ckartik Jun 20, 2024
ccf1ff1
feat: cleanup code
ckartik Jun 20, 2024
4565a42
feat: adds txn receipts to tests
ckartik Jun 20, 2024
22e4f89
chore: updates test stub to include txn receipts
ckartik Jun 20, 2024
e80a568
feat: adds a revert check in tests
ckartik Jun 20, 2024
a8bee61
feat: use errgroup and syncmap
ckartik Jul 1, 2024
af546ea
feat: introduces batching
ckartik Jul 1, 2024
43bfaa1
feat: rework bucketing to minimize requests
ckartik Jul 2, 2024
8fed5fb
chore: resolves nit PR requests
ckartik Jul 2, 2024
dcd8930
chore: removes closure var
ckartik Jul 2, 2024
a55e1f9
chore: use range over integer
ckartik Jul 2, 2024
d9fb69a
feat: adds metrics
ckartik Jul 2, 2024
bee4cb9
chore: adds metric to collector
ckartik Jul 2, 2024
9da2f04
feat: get total duration for block receipts
ckartik Jul 3, 2024
ff3e6bb
feat: adds retry to BatchReceipts call
ckartik Jul 3, 2024
c33d14f
feat: adds retries to all calls
ckartik Jul 3, 2024
1fec832
feat: check for 429 in infinite retry
ckartik Jul 4, 2024
02c1216
feat: do infinite retry on batch call and log
ckartik Jul 4, 2024
0c3e132
feat: checks for 429 errors
ckartik Jul 4, 2024
e0d3ee0
feat: attempt the request multiple times
ckartik Jul 4, 2024
93d96b2
feat: attempt retries 50 times before failing
ckartik Jul 4, 2024
9b61c0e
feat: reduce the size of batch
ckartik Jul 4, 2024
4f4ff58
feat: adds personal L1 RPC URL
ckartik Jul 4, 2024
fcb7880
chore: adds temp debug logs
ckartik Jul 4, 2024
36c218f
chore: avoid context from cancelation
ckartik Jul 4, 2024
745ce29
chore: panic when updater routine fails
ckartik Jul 4, 2024
fdcf9a6
feat: adds logger for evmhelper
ckartik Jul 5, 2024
a220daa
feat: add more logs
ckartik Jul 5, 2024
816dd33
feat: fix linter issues
ckartik Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/nomad/playbooks/variables/profiles.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
datacenter: "dc1"
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/WqNEQeeexFLQwECjxCPpdep0uvCgn8Yj"
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/H8JN1wImnEPrxkFRVOT7cJ_gzu9x3VmB"
ckartik marked this conversation as resolved.
Show resolved Hide resolved

artifacts:
bidder_emulator: &bidder_emulator_artifact
Expand Down
1 change: 1 addition & 0 deletions oracle/pkg/l1Listener/l1Listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type WinnerRegister interface {
type EthClient interface {
BlockNumber(ctx context.Context) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required? This interface is a subset of the methods of the EthClient. We dont use the BlockByNumber in this pkg. You can still pass your retry client to this pkg even after removing this.

}

type L1Listener struct {
Expand Down
4 changes: 4 additions & 0 deletions oracle/pkg/l1Listener/l1Listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ func (t *testEthClient) HeaderByNumber(_ context.Context, number *big.Int) (*typ
return hdr, nil
}

func (t *testEthClient) BlockByNumber(_ context.Context, number *big.Int) (*types.Block, error) {
return nil, nil
}

func publishLog(
eventManager events.EventManager,
blockNum *big.Int,
Expand Down
62 changes: 60 additions & 2 deletions oracle/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewNode(opts *Options) (*Node, error) {
monitor := txmonitor.New(
owner,
settlementClient,
txmonitor.NewEVMHelper(settlementClient.Client()),
txmonitor.NewEVMHelperWithLogger(settlementClient.Client(), nd.logger),
st,
nd.logger.With("component", "tx_monitor"),
1024,
Expand Down Expand Up @@ -164,6 +164,8 @@ func NewNode(opts *Options) (*Node, error) {
listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode}
}

listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger}

blockTracker, err := blocktracker.NewBlocktrackerTransactor(
opts.BlockTrackerContractAddr,
settlementRPC,
Expand Down Expand Up @@ -232,10 +234,11 @@ func NewNode(opts *Options) (*Node, error) {

updtr, err := updater.NewUpdater(
nd.logger.With("component", "updater"),
l1Client,
listenerL1Client,
st,
evtMgr,
oracleTransactorSession,
txmonitor.NewEVMHelperWithLogger(l1Client.Client(), nd.logger),
)
if err != nil {
nd.logger.Error("failed to instantiate updater", "error", err)
Expand Down Expand Up @@ -403,6 +406,61 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big
return hdr, nil
}

type infiniteRetryL1Client struct {
l1Listener.EthClient
logger *slog.Logger
}

func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) {
var blkNum uint64
var err error
for retries := 50; retries > 0; retries-- {
blkNum, err = i.EthClient.BlockNumber(ctx)
if err == nil {
break
}
i.logger.Error("failed to get block number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return 0, err
}
return blkNum, nil
}

func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
var hdr *types.Header
var err error
for retries := 50; retries > 0; retries-- {
hdr, err = i.EthClient.HeaderByNumber(ctx, number)
if err == nil {
break
}
i.logger.Error("failed to get header by number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return nil, err
}
return hdr, nil
}

func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
var blk *types.Block
var err error
for retries := 50; retries > 0; retries-- {
blk, err = i.EthClient.BlockByNumber(ctx, number)
if err == nil {
break
}
i.logger.Error("failed to get block by number, retrying...", "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
return nil, err
}
return blk, nil
}
func setBuilderMapping(
ctx context.Context,
bt *blocktracker.BlocktrackerTransactorSession,
Expand Down
46 changes: 33 additions & 13 deletions oracle/pkg/updater/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ const (
)

type metrics struct {
CommitmentsReceivedCount prometheus.Counter
CommitmentsProcessedCount prometheus.Counter
CommitmentsTooOldCount prometheus.Counter
DuplicateCommitmentsCount prometheus.Counter
RewardsCount prometheus.Counter
SlashesCount prometheus.Counter
EncryptedCommitmentsCount prometheus.Counter
NoWinnerCount prometheus.Counter
BlockTxnCacheHits prometheus.Counter
BlockTxnCacheMisses prometheus.Counter
BlockTimeCacheHits prometheus.Counter
BlockTimeCacheMisses prometheus.Counter
LastSentNonce prometheus.Gauge
CommitmentsReceivedCount prometheus.Counter
CommitmentsProcessedCount prometheus.Counter
CommitmentsTooOldCount prometheus.Counter
DuplicateCommitmentsCount prometheus.Counter
RewardsCount prometheus.Counter
SlashesCount prometheus.Counter
EncryptedCommitmentsCount prometheus.Counter
NoWinnerCount prometheus.Counter
BlockTxnCacheHits prometheus.Counter
BlockTxnCacheMisses prometheus.Counter
BlockTimeCacheHits prometheus.Counter
BlockTimeCacheMisses prometheus.Counter
LastSentNonce prometheus.Gauge
TxnReceiptRequestDuration prometheus.Histogram
TxnReceiptRequestBlockDuration prometheus.Histogram
}

func newMetrics() *metrics {
Expand Down Expand Up @@ -129,6 +131,22 @@ func newMetrics() *metrics {
Help: "Last nonce sent to for settlement",
},
)
m.TxnReceiptRequestDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: defaultNamespace,
Subsystem: subsystem,
Name: "txn_receipt_request_duration",
Help: "Duration of transaction receipt requests",
},
)
m.TxnReceiptRequestBlockDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: defaultNamespace,
Subsystem: subsystem,
Name: "txn_receipt_request_block_duration",
Help: "Duration of transaction receipt requests",
},
)
return m
}

Expand All @@ -147,5 +165,7 @@ func (m *metrics) Collectors() []prometheus.Collector {
m.BlockTimeCacheHits,
m.BlockTimeCacheMisses,
m.LastSentNonce,
m.TxnReceiptRequestDuration,
m.TxnReceiptRequestBlockDuration,
}
}
114 changes: 97 additions & 17 deletions oracle/pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -18,12 +20,18 @@ import (
blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker"
preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore"
"github.com/primev/mev-commit/x/contracts/events"
"github.com/primev/mev-commit/x/contracts/txmonitor"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)

type SettlementType string

type TxMetadata struct {
PosInBlock int
ckartik marked this conversation as resolved.
Show resolved Hide resolved
Succeeded bool
}

const (
SettlementTypeReward SettlementType = "reward"
SettlementTypeSlash SettlementType = "slash"
Expand Down Expand Up @@ -92,11 +100,12 @@ type Updater struct {
winnerRegister WinnerRegister
oracle Oracle
evtMgr events.EventManager
l1BlockCache *lru.Cache[uint64, map[string]int]
l1BlockCache *lru.Cache[uint64, map[string]TxMetadata]
encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored
openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored
currentWindow atomic.Int64
metrics *metrics
receiptBatcher txmonitor.BatchReceiptGetter
}

func NewUpdater(
Expand All @@ -105,8 +114,9 @@ func NewUpdater(
winnerRegister WinnerRegister,
evtMgr events.EventManager,
oracle Oracle,
receiptBatcher txmonitor.BatchReceiptGetter,
) (*Updater, error) {
l1BlockCache, err := lru.New[uint64, map[string]int](1024)
l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024)
if err != nil {
return nil, fmt.Errorf("failed to create L1 block cache: %w", err)
}
Expand All @@ -117,6 +127,7 @@ func NewUpdater(
winnerRegister: winnerRegister,
evtMgr: evtMgr,
oracle: oracle,
receiptBatcher: receiptBatcher,
metrics: newMetrics(),
openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored),
encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored),
Expand Down Expand Up @@ -205,7 +216,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} {
go func() {
defer close(doneChan)
if err := eg.Wait(); err != nil {
u.logger.Error("failed to start updater", "error", err)
u.logger.Error("updater failed, exiting", "error", err)
panic(err)
}
}()

Expand Down Expand Up @@ -316,7 +328,6 @@ func (u *Updater) handleOpenedCommitment(
)
return err
}

// Compute the decay percentage
decayPercentage := u.computeDecayPercentage(
update.DecayStartTimeStamp,
Expand All @@ -327,17 +338,19 @@ func (u *Updater) handleOpenedCommitment(
commitmentTxnHashes := strings.Split(update.TxnHash, ",")
// Ensure Bundle is atomic and present in the block
for i := 0; i < len(commitmentTxnHashes); i++ {
posInBlock, found := txns[commitmentTxnHashes[i]]
if !found || posInBlock != txns[commitmentTxnHashes[0]]+i {
txnDetails, found := txns[commitmentTxnHashes[i]]
ckartik marked this conversation as resolved.
Show resolved Hide resolved
if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded {
ckartik marked this conversation as resolved.
Show resolved Hide resolved
u.logger.Info(
"bundle is not atomic",
"bundle does not satsify commited requirements",
"commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]),
"txnHash", update.TxnHash,
"blockNumber", update.BlockNumber,
"found", found,
"posInBlock", posInBlock,
"expectedPosInBlock", txns[commitmentTxnHashes[0]]+i,
"posInBlock", txnDetails.PosInBlock,
"succeeded", txnDetails.Succeeded,
"expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i,
)

// The committer did not include the transactions in the block
// correctly, so this is a slash to be processed
return u.settle(
Expand Down Expand Up @@ -450,28 +463,95 @@ func (u *Updater) addSettlement(

return nil
}

func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]int, error) {
func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) {
txns, ok := u.l1BlockCache.Get(blockNum)
if ok {
u.metrics.BlockTxnCacheHits.Inc()
u.logger.Info("cache hit for block transactions", "blockNum", blockNum)
return txns, nil
}

u.metrics.BlockTxnCacheMisses.Inc()
u.logger.Info("cache miss for block transactions", "blockNum", blockNum)

blk, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
if err != nil {
u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err)
return nil, fmt.Errorf("failed to get block by number: %w", err)
}

txnsInBlock := make(map[string]int)
for posInBlock, tx := range blk.Transactions() {
txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock
u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())

var txnReceipts sync.Map
eg, ctx := errgroup.WithContext(ctx)

txnsArray := make([]common.Hash, len(block.Transactions()))
for i, tx := range block.Transactions() {
txnsArray[i] = tx.Hash()
}
_ = u.l1BlockCache.Add(blockNum, txnsInBlock)
const bucketSize = 25 // Arbitrary number for bucket size

numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up
buckets := make([][]common.Hash, numBuckets)
for i := 0; i < numBuckets; i++ {
start := i * bucketSize
end := start + bucketSize
if end > len(txnsArray) {
end = len(txnsArray)
}
buckets[i] = txnsArray[start:end]
}

blockStart := time.Now()

for _, bucket := range buckets {
eg.Go(func() error {
start := time.Now()
u.logger.Info("requesting batch receipts", "bucketSize", len(bucket))
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
if err != nil {
u.logger.Error("failed to get batch receipts", "error", err)
return fmt.Errorf("failed to get batch receipts: %w", err)
}
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
ckartik marked this conversation as resolved.
Show resolved Hide resolved
u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds())
for _, result := range results {
if result.Err != nil {
u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err)
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
}

txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
}

return nil
})
}

if err := eg.Wait(); err != nil {
u.logger.Error("error while waiting for batch receipts", "error", err)
return nil, err
}

u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds())
u.logger.Info("completed batch receipt requests for block", "blockNum", blockNum, "duration", time.Since(blockStart).Seconds())

txnsMap := make(map[string]TxMetadata)
for i, tx := range txnsArray {
receipt, ok := txnReceipts.Load(tx.Hex())
if !ok {
u.logger.Error("receipt not found for txn", "txnHash", tx.Hex())
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
}
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
}

_ = u.l1BlockCache.Add(blockNum, txnsMap)
u.logger.Info("added block transactions to cache", "blockNum", blockNum)

return txnsInBlock, nil
return txnsMap, nil
}

// computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage
Expand Down
Loading
Loading