From a233c098a179c5a1b2ddb49b45754e9ce27ef90c Mon Sep 17 00:00:00 2001 From: Alok Date: Tue, 9 Jul 2024 20:29:02 +0530 Subject: [PATCH] fix: wait for events for update --- p2p/pkg/autodepositor/autodepositor.go | 92 ++++++++-------- p2p/pkg/autodepositor/autodepositor_test.go | 111 +++++++++++++------- p2p/pkg/node/node.go | 1 - x/contracts/events/events.go | 8 +- 4 files changed, 122 insertions(+), 90 deletions(-) diff --git a/p2p/pkg/autodepositor/autodepositor.go b/p2p/pkg/autodepositor/autodepositor.go index e431be380..30f44b81d 100644 --- a/p2p/pkg/autodepositor/autodepositor.go +++ b/p2p/pkg/autodepositor/autodepositor.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/types" + bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry" blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" "github.com/primev/mev-commit/x/contracts/events" "golang.org/x/sync/errgroup" @@ -22,15 +23,10 @@ type BidderRegistryContract interface { WithdrawFromWindows(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error) } -type TxWatcher interface { - WaitForReceipt(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) -} - type AutoDepositTracker struct { startMu sync.Mutex isWorking bool eventMgr events.EventManager - watcher TxWatcher deposits sync.Map windowChan chan *blocktracker.BlocktrackerNewWindow brContract BidderRegistryContract @@ -43,14 +39,12 @@ func New( evtMgr events.EventManager, brContract BidderRegistryContract, optsGetter OptsGetter, - watcher TxWatcher, logger *slog.Logger, ) *AutoDepositTracker { return &AutoDepositTracker{ eventMgr: evtMgr, brContract: brContract, optsGetter: optsGetter, - watcher: watcher, windowChan: make(chan *blocktracker.BlocktrackerNewWindow, 1), logger: logger, } @@ -67,38 +61,16 @@ func (adt *AutoDepositTracker) Start( return fmt.Errorf("auto deposit tracker is already running") } - nextWindow := new(big.Int).Add(startWindow, big.NewInt(1)) - opts, err := adt.optsGetter(ctx) if err != nil { return err } - opts.Value = big.NewInt(0).Mul(amount, big.NewInt(2)) - - // Make initial deposit for the first two windows - txn, err := adt.brContract.DepositForWindows(opts, []*big.Int{startWindow, nextWindow}) - if err != nil { - return err - } - - receipt, err := adt.watcher.WaitForReceipt(ctx, txn) - if err != nil { - return err - } - - if receipt.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("initial deposit transaction failed: %d", receipt.Status) - } - - adt.deposits.Store(startWindow.Uint64(), true) - adt.deposits.Store(nextWindow.Uint64(), true) - eg, egCtx := errgroup.WithContext(context.Background()) egCtx, cancel := context.WithCancel(egCtx) adt.cancelFunc = cancel - evt := events.NewEventHandler( + evt1 := events.NewEventHandler( "NewWindow", func(update *blocktracker.BlocktrackerNewWindow) { adt.logger.Info( @@ -112,11 +84,51 @@ func (adt *AutoDepositTracker) Start( }, ) - sub, err := adt.eventMgr.Subscribe(evt) + evt2 := events.NewEventHandler( + "BidderRegistered", + func(bidderReg *bidderregistry.BidderregistryBidderRegistered) { + if bidderReg.Bidder.Cmp(opts.From) != 0 { + return + } + adt.logger.Info( + "bidder registered event", + "bidder", bidderReg.Bidder.String(), + "window", bidderReg.WindowNumber, + ) + adt.deposits.Store(bidderReg.WindowNumber.Uint64(), true) + }, + ) + + evt3 := events.NewEventHandler( + "BidderWithdrawal", + func(bidderReg *bidderregistry.BidderregistryBidderWithdrawal) { + if bidderReg.Bidder.Cmp(opts.From) != 0 { + return + } + adt.logger.Info( + "bidder withdrawal event", + "bidder", bidderReg.Bidder.String(), + "window", bidderReg.Window, + ) + adt.deposits.Delete(bidderReg.Window.Uint64()) + }, + ) + + sub, err := adt.eventMgr.Subscribe(evt1, evt2, evt3) if err != nil { return fmt.Errorf("error subscribing to event: %w", err) } + nextWindow := new(big.Int).Add(startWindow, big.NewInt(1)) + + opts.Value = big.NewInt(0).Mul(amount, big.NewInt(2)) + + // Make initial deposit for the first two windows + _, err = adt.brContract.DepositForWindows(opts, []*big.Int{startWindow, nextWindow}) + if err != nil { + return err + } + eg.Go(func() error { for { select { @@ -143,17 +155,7 @@ func (adt *AutoDepositTracker) Start( if err != nil { return err } - r, err := adt.watcher.WaitForReceipt(egCtx, txn) - if err != nil { - return err - } - if r.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("withdraw transaction failed: %d", r.Status) - } adt.logger.Info("withdraw from windows", "hash", txn.Hash(), "windows", withdrawWindows) - for _, window := range withdrawWindows { - adt.deposits.Delete(window.Uint64()) - } } // Make deposit for the next window. The window event is 2 windows @@ -174,20 +176,12 @@ func (adt *AutoDepositTracker) Start( if err != nil { return err } - r, err := adt.watcher.WaitForReceipt(egCtx, txn) - if err != nil { - return err - } - if r.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("deposit transaction failed: %d", r.Status) - } adt.logger.Info( "deposited to next window", "hash", txn.Hash(), "window", nextWindow, "amount", amount, ) - adt.deposits.Store(nextWindow.Uint64(), true) } } }) diff --git a/p2p/pkg/autodepositor/autodepositor_test.go b/p2p/pkg/autodepositor/autodepositor_test.go index 830349f79..04c558cea 100644 --- a/p2p/pkg/autodepositor/autodepositor_test.go +++ b/p2p/pkg/autodepositor/autodepositor_test.go @@ -33,14 +33,6 @@ func (m *MockBidderRegistryContract) WithdrawFromWindows(opts *bind.TransactOpts return m.WithdrawFromWindowsFunc(opts, windows) } -type MockTxWatcher struct { - WaitForReceiptFunc func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) -} - -func (m *MockTxWatcher) WaitForReceipt(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { - return m.WaitForReceiptFunc(ctx, tx) -} - func TestAutoDepositTracker_Start(t *testing.T) { t.Parallel() @@ -55,34 +47,44 @@ func TestAutoDepositTracker_Start(t *testing.T) { t.Fatal(err) } + addr := common.HexToAddress("0x1234") + amount := big.NewInt(100) logger := util.NewTestLogger(os.Stdout) evtMgr := events.NewListener(logger, &btABI, &brABI) brContract := &MockBidderRegistryContract{ DepositForWindowsFunc: func(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error) { + for _, window := range windows { + publishBidderRegistered(evtMgr, &brABI, &bidderregistry.BidderregistryBidderRegistered{ + Bidder: addr, + DepositedAmount: amount, + WindowNumber: window, + }) + } return types.NewTransaction(1, common.Address{}, nil, 0, nil, nil), nil }, WithdrawFromWindowsFunc: func(opts *bind.TransactOpts, windows []*big.Int) (*types.Transaction, error) { + for _, window := range windows { + publishBidderWithdrawal(evtMgr, &brABI, &bidderregistry.BidderregistryBidderWithdrawal{ + Bidder: addr, + Amount: amount, + Window: window, + }) + } return types.NewTransaction(1, common.Address{}, nil, 0, nil, nil), nil }, } optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) { - return &bind.TransactOpts{}, nil - } - watcher := &MockTxWatcher{ - WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { - return &types.Receipt{ - Status: 1, - }, nil - }, + return &bind.TransactOpts{ + From: addr, + }, nil } // Create AutoDepositTracker instance - adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger) + adt := autodepositor.New(evtMgr, brContract, optsGetter, logger) // Start AutoDepositTracker ctx := context.Background() startWindow := big.NewInt(2) - amount := big.NewInt(100) err = adt.Start(ctx, startWindow, amount) if err != nil { t.Fatal(err) @@ -168,16 +170,9 @@ func TestAutoDepositTracker_Start_CancelContext(t *testing.T) { optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) { return &bind.TransactOpts{}, nil } - watcher := &MockTxWatcher{ - WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { - return &types.Receipt{ - Status: 1, - }, nil - }, - } // Create AutoDepositTracker instance - adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger) + adt := autodepositor.New(evtMgr, brContract, optsGetter, logger) // Start AutoDepositTracker with a cancelable context ctx, cancel := context.WithCancel(context.Background()) @@ -210,10 +205,9 @@ func TestAutoDepositTracker_Stop_NotRunning(t *testing.T) { optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) { return &bind.TransactOpts{}, nil } - watcher := &MockTxWatcher{} // Create AutoDepositTracker instance - adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger) + adt := autodepositor.New(evtMgr, brContract, optsGetter, logger) // Stop AutoDepositTracker when not running _, err = adt.Stop() @@ -246,16 +240,9 @@ func TestAutoDepositTracker_IsWorking(t *testing.T) { optsGetter := func(ctx context.Context) (*bind.TransactOpts, error) { return &bind.TransactOpts{}, nil } - watcher := &MockTxWatcher{ - WaitForReceiptFunc: func(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { - return &types.Receipt{ - Status: 1, - }, nil - }, - } // Create AutoDepositTracker instance - adt := autodepositor.New(evtMgr, brContract, optsGetter, watcher, logger) + adt := autodepositor.New(evtMgr, brContract, optsGetter, logger) // Assert initial IsWorking status if adt.IsWorking() { @@ -302,3 +289,55 @@ func publishNewWindow( } evtMgr.PublishLogEvent(context.Background(), testLog) } + +func publishBidderRegistered( + evtMgr events.EventManager, + brABI *abi.ABI, + br *bidderregistry.BidderregistryBidderRegistered, +) error { + event := brABI.Events["BidderRegistered"] + buf, err := event.Inputs.NonIndexed().Pack( + br.DepositedAmount, + br.WindowNumber, + ) + if err != nil { + return err + } + + testLog := types.Log{ + Topics: []common.Hash{ + event.ID, + common.HexToHash(br.Bidder.Hex()), + }, + Data: buf, + } + evtMgr.PublishLogEvent(context.Background(), testLog) + + return nil +} + +func publishBidderWithdrawal( + evtMgr events.EventManager, + brABI *abi.ABI, + br *bidderregistry.BidderregistryBidderWithdrawal, +) error { + event := brABI.Events["BidderWithdrawal"] + buf, err := event.Inputs.NonIndexed().Pack( + br.Window, + br.Amount, + ) + if err != nil { + return err + } + + testLog := types.Log{ + Topics: []common.Hash{ + event.ID, + common.HexToHash(br.Bidder.Hex()), + }, + Data: buf, + } + evtMgr.PublishLogEvent(context.Background(), testLog) + + return nil +} diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index bedf67a0b..bac3c5cea 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -440,7 +440,6 @@ func NewNode(opts *Options) (*Node, error) { evtMgr, bidderRegistry, optsGetter, - monitor, opts.Logger.With("component", "auto_deposit_tracker"), ) diff --git a/x/contracts/events/events.go b/x/contracts/events/events.go index 9e2b21a4c..370d7efd2 100644 --- a/x/contracts/events/events.go +++ b/x/contracts/events/events.go @@ -220,15 +220,15 @@ func (l *Listener) PublishLogEvent(ctx context.Context, log types.Log) { l.metrics.totalLogs.Inc() - // wg := sync.WaitGroup{} + wg := sync.WaitGroup{} events := l.subscribers[log.Topics[0]] for _, event := range events { ev := event l.metrics.totalEvents.Inc() l.metrics.eventCounts.WithLabelValues(ev.evt.eventName()).Inc() - // wg.Add(1) + wg.Add(1) go func() { - // defer wg.Done() + defer wg.Done() defer func(start time.Time) { l.metrics.eventHandlerDurations. @@ -247,5 +247,5 @@ func (l *Listener) PublishLogEvent(ctx context.Context, log types.Log) { }() } - // wg.Wait() + wg.Wait() }