Skip to content

Commit

Permalink
fix: wait for events for update
Browse files Browse the repository at this point in the history
  • Loading branch information
Alok committed Jul 9, 2024
1 parent 04e14a3 commit a233c09
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 90 deletions.
92 changes: 43 additions & 49 deletions p2p/pkg/autodepositor/autodepositor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry"
blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker"
"github.com/primev/mev-commit/x/contracts/events"
"golang.org/x/sync/errgroup"
Expand All @@ -22,15 +23,10 @@ type BidderRegistryContract interface {
WithdrawFromWindows(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error)
}

type TxWatcher interface {
WaitForReceipt(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
}

type AutoDepositTracker struct {
startMu sync.Mutex
isWorking bool
eventMgr events.EventManager
watcher TxWatcher
deposits sync.Map
windowChan chan *blocktracker.BlocktrackerNewWindow
brContract BidderRegistryContract
Expand All @@ -43,14 +39,12 @@ func New(
evtMgr events.EventManager,
brContract BidderRegistryContract,
optsGetter OptsGetter,
watcher TxWatcher,
logger *slog.Logger,
) *AutoDepositTracker {
return &AutoDepositTracker{
eventMgr: evtMgr,
brContract: brContract,
optsGetter: optsGetter,
watcher: watcher,
windowChan: make(chan *blocktracker.BlocktrackerNewWindow, 1),
logger: logger,
}
Expand All @@ -67,38 +61,16 @@ func (adt *AutoDepositTracker) Start(
return fmt.Errorf("auto deposit tracker is already running")
}

nextWindow := new(big.Int).Add(startWindow, big.NewInt(1))

opts, err := adt.optsGetter(ctx)
if err != nil {
return err
}

opts.Value = big.NewInt(0).Mul(amount, big.NewInt(2))

// Make initial deposit for the first two windows
txn, err := adt.brContract.DepositForWindows(opts, []*big.Int{startWindow, nextWindow})
if err != nil {
return err
}

receipt, err := adt.watcher.WaitForReceipt(ctx, txn)
if err != nil {
return err
}

if receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("initial deposit transaction failed: %d", receipt.Status)
}

adt.deposits.Store(startWindow.Uint64(), true)
adt.deposits.Store(nextWindow.Uint64(), true)

eg, egCtx := errgroup.WithContext(context.Background())
egCtx, cancel := context.WithCancel(egCtx)
adt.cancelFunc = cancel

evt := events.NewEventHandler(
evt1 := events.NewEventHandler(
"NewWindow",
func(update *blocktracker.BlocktrackerNewWindow) {
adt.logger.Info(
Expand All @@ -112,11 +84,51 @@ func (adt *AutoDepositTracker) Start(
},
)

sub, err := adt.eventMgr.Subscribe(evt)
evt2 := events.NewEventHandler(
"BidderRegistered",
func(bidderReg *bidderregistry.BidderregistryBidderRegistered) {
if bidderReg.Bidder.Cmp(opts.From) != 0 {
return
}
adt.logger.Info(
"bidder registered event",
"bidder", bidderReg.Bidder.String(),
"window", bidderReg.WindowNumber,
)
adt.deposits.Store(bidderReg.WindowNumber.Uint64(), true)
},
)

evt3 := events.NewEventHandler(
"BidderWithdrawal",
func(bidderReg *bidderregistry.BidderregistryBidderWithdrawal) {
if bidderReg.Bidder.Cmp(opts.From) != 0 {
return
}
adt.logger.Info(
"bidder withdrawal event",
"bidder", bidderReg.Bidder.String(),
"window", bidderReg.Window,
)
adt.deposits.Delete(bidderReg.Window.Uint64())
},
)

sub, err := adt.eventMgr.Subscribe(evt1, evt2, evt3)
if err != nil {
return fmt.Errorf("error subscribing to event: %w", err)
}

nextWindow := new(big.Int).Add(startWindow, big.NewInt(1))

opts.Value = big.NewInt(0).Mul(amount, big.NewInt(2))

// Make initial deposit for the first two windows
_, err = adt.brContract.DepositForWindows(opts, []*big.Int{startWindow, nextWindow})
if err != nil {
return err
}

eg.Go(func() error {
for {
select {
Expand All @@ -143,17 +155,7 @@ func (adt *AutoDepositTracker) Start(
if err != nil {
return err
}
r, err := adt.watcher.WaitForReceipt(egCtx, txn)
if err != nil {
return err
}
if r.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("withdraw transaction failed: %d", r.Status)
}
adt.logger.Info("withdraw from windows", "hash", txn.Hash(), "windows", withdrawWindows)
for _, window := range withdrawWindows {
adt.deposits.Delete(window.Uint64())
}
}

// Make deposit for the next window. The window event is 2 windows
Expand All @@ -174,20 +176,12 @@ func (adt *AutoDepositTracker) Start(
if err != nil {
return err
}
r, err := adt.watcher.WaitForReceipt(egCtx, txn)
if err != nil {
return err
}
if r.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("deposit transaction failed: %d", r.Status)
}
adt.logger.Info(
"deposited to next window",
"hash", txn.Hash(),
"window", nextWindow,
"amount", amount,
)
adt.deposits.Store(nextWindow.Uint64(), true)
}
}
})
Expand Down
111 changes: 75 additions & 36 deletions p2p/pkg/autodepositor/autodepositor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ func (m *MockBidderRegistryContract) WithdrawFromWindows(opts *bind.TransactOpts
return m.WithdrawFromWindowsFunc(opts, windows)
}

type MockTxWatcher struct {
WaitForReceiptFunc func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
}

func (m *MockTxWatcher) WaitForReceipt(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
return m.WaitForReceiptFunc(ctx, tx)
}

func TestAutoDepositTracker_Start(t *testing.T) {
t.Parallel()

Expand All @@ -55,34 +47,44 @@ func TestAutoDepositTracker_Start(t *testing.T) {
t.Fatal(err)
}

addr := common.HexToAddress("0x1234")
amount := big.NewInt(100)
logger := util.NewTestLogger(os.Stdout)
evtMgr := events.NewListener(logger, &btABI, &brABI)
brContract := &MockBidderRegistryContract{
DepositForWindowsFunc: func(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error) {
for _, window := range windows {
publishBidderRegistered(evtMgr, &brABI, &bidderregistry.BidderregistryBidderRegistered{

Check failure on line 57 in p2p/pkg/autodepositor/autodepositor_test.go

View workflow job for this annotation

GitHub Actions / Test and Build Go Modules

Error return value is not checked (errcheck)
Bidder: addr,
DepositedAmount: amount,
WindowNumber: window,
})
}
return types.NewTransaction(1, common.Address{}, nil, 0, nil, nil), nil
},
WithdrawFromWindowsFunc: func(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error) {
for _, window := range windows {
publishBidderWithdrawal(evtMgr, &brABI, &bidderregistry.BidderregistryBidderWithdrawal{

Check failure on line 67 in p2p/pkg/autodepositor/autodepositor_test.go

View workflow job for this annotation

GitHub Actions / Test and Build Go Modules

Error return value is not checked (errcheck)
Bidder: addr,
Amount: amount,
Window: window,
})
}
return types.NewTransaction(1, common.Address{}, nil, 0, nil, nil), nil
},
}
optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) {
return &bind.TransactOpts{}, nil
}
watcher := &MockTxWatcher{
WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
return &types.Receipt{
Status: 1,
}, nil
},
return &bind.TransactOpts{
From: addr,
}, nil
}

// Create AutoDepositTracker instance
adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger)
adt := autodepositor.New(evtMgr, brContract, optsGetter, logger)

// Start AutoDepositTracker
ctx := context.Background()
startWindow := big.NewInt(2)
amount := big.NewInt(100)
err = adt.Start(ctx, startWindow, amount)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -168,16 +170,9 @@ func TestAutoDepositTracker_Start_CancelContext(t *testing.T) {
optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) {
return &bind.TransactOpts{}, nil
}
watcher := &MockTxWatcher{
WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
return &types.Receipt{
Status: 1,
}, nil
},
}

// Create AutoDepositTracker instance
adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger)
adt := autodepositor.New(evtMgr, brContract, optsGetter, logger)

// Start AutoDepositTracker with a cancelable context
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -210,10 +205,9 @@ func TestAutoDepositTracker_Stop_NotRunning(t *testing.T) {
optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) {
return &bind.TransactOpts{}, nil
}
watcher := &MockTxWatcher{}

// Create AutoDepositTracker instance
adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger)
adt := autodepositor.New(evtMgr, brContract, optsGetter, logger)

// Stop AutoDepositTracker when not running
_, err = adt.Stop()
Expand Down Expand Up @@ -246,16 +240,9 @@ func TestAutoDepositTracker_IsWorking(t *testing.T) {
optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) {
return &bind.TransactOpts{}, nil
}
watcher := &MockTxWatcher{
WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
return &types.Receipt{
Status: 1,
}, nil
},
}

// Create AutoDepositTracker instance
adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger)
adt := autodepositor.New(evtMgr, brContract, optsGetter, logger)

// Assert initial IsWorking status
if adt.IsWorking() {
Expand Down Expand Up @@ -302,3 +289,55 @@ func publishNewWindow(
}
evtMgr.PublishLogEvent(context.Background(), testLog)
}

func publishBidderRegistered(
evtMgr events.EventManager,
brABI *abi.ABI,
br *bidderregistry.BidderregistryBidderRegistered,
) error {
event := brABI.Events["BidderRegistered"]
buf, err := event.Inputs.NonIndexed().Pack(
br.DepositedAmount,
br.WindowNumber,
)
if err != nil {
return err
}

testLog := types.Log{
Topics: []common.Hash{
event.ID,
common.HexToHash(br.Bidder.Hex()),
},
Data: buf,
}
evtMgr.PublishLogEvent(context.Background(), testLog)

return nil
}

func publishBidderWithdrawal(
evtMgr events.EventManager,
brABI *abi.ABI,
br *bidderregistry.BidderregistryBidderWithdrawal,
) error {
event := brABI.Events["BidderWithdrawal"]
buf, err := event.Inputs.NonIndexed().Pack(
br.Window,
br.Amount,
)
if err != nil {
return err
}

testLog := types.Log{
Topics: []common.Hash{
event.ID,
common.HexToHash(br.Bidder.Hex()),
},
Data: buf,
}
evtMgr.PublishLogEvent(context.Background(), testLog)

return nil
}
1 change: 0 additions & 1 deletion p2p/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ func NewNode(opts *Options) (*Node, error) {
evtMgr,
bidderRegistry,
optsGetter,
monitor,
opts.Logger.With("component", "auto_deposit_tracker"),
)

Expand Down
8 changes: 4 additions & 4 deletions x/contracts/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ func (l *Listener) PublishLogEvent(ctx context.Context, log types.Log) {

l.metrics.totalLogs.Inc()

// wg := sync.WaitGroup{}
wg := sync.WaitGroup{}
events := l.subscribers[log.Topics[0]]
for _, event := range events {
ev := event
l.metrics.totalEvents.Inc()
l.metrics.eventCounts.WithLabelValues(ev.evt.eventName()).Inc()
// wg.Add(1)
wg.Add(1)
go func() {
// defer wg.Done()
defer wg.Done()

defer func(start time.Time) {
l.metrics.eventHandlerDurations.
Expand All @@ -247,5 +247,5 @@ func (l *Listener) PublishLogEvent(ctx context.Context, log types.Log) {
}()
}

// wg.Wait()
wg.Wait()
}

0 comments on commit a233c09

Please sign in to comment.