From a34ba7712d1dbf1bbe475a64331d3f2f2d9aef19 Mon Sep 17 00:00:00 2001 From: Mikhail Wall Date: Wed, 10 Jul 2024 01:55:42 +0200 Subject: [PATCH] chore: get rid of startFromApi --- p2p/pkg/autodepositor/autodepositor.go | 69 ++++----------------- p2p/pkg/autodepositor/autodepositor_test.go | 6 +- p2p/pkg/node/node.go | 11 +++- p2p/pkg/rpc/bidder/service.go | 4 +- p2p/pkg/rpc/bidder/service_test.go | 2 +- 5 files changed, 27 insertions(+), 65 deletions(-) diff --git a/p2p/pkg/autodepositor/autodepositor.go b/p2p/pkg/autodepositor/autodepositor.go index 008b52b0e..e47a0b180 100644 --- a/p2p/pkg/autodepositor/autodepositor.go +++ b/p2p/pkg/autodepositor/autodepositor.go @@ -59,62 +59,7 @@ func New( } } -func (adt *AutoDepositTracker) Start(ctx context.Context) <-chan struct{} { - adt.startMu.Lock() - defer adt.startMu.Unlock() - - doneChan := make(chan struct{}) - - if adt.isWorking { - close(doneChan) - adt.logger.Error("failed to subscribe to events", "error", "auto deposit tracker is already running") - return doneChan - } - - currentWindow, err := adt.btContract.GetCurrentWindow() - if err != nil { - close(doneChan) - adt.logger.Error("failed to get current window", "error", err) - return doneChan - } - // adding +2 as oracle runs two windows behind - currentWindow = new(big.Int).Add(currentWindow, big.NewInt(2)) - - eg, egCtx := errgroup.WithContext(context.Background()) - egCtx, cancel := context.WithCancel(egCtx) - adt.cancelFunc = cancel - - sub, err := adt.initSub(egCtx) - - if err != nil { - close(doneChan) - adt.logger.Error("error subscribing to event", "error", err) - return doneChan - } - - err = adt.doInitialDeposit(ctx, currentWindow, adt.initialAmount) - if err != nil { - close(doneChan) - adt.logger.Error("failed to do initial deposit", "error", err) - return doneChan - } - - adt.startAutodeposit(egCtx, eg, adt.initialAmount, sub) - - go func() { - defer close(doneChan) - if err := eg.Wait(); err != nil { - adt.logger.Error("error in errgroup", "error", err) - } - adt.startMu.Lock() - adt.isWorking = false - adt.startMu.Unlock() - }() - - return doneChan -} - -func (adt *AutoDepositTracker) StartFromApi( +func (adt *AutoDepositTracker) Start( ctx context.Context, startWindow, amount *big.Int, ) error { @@ -125,6 +70,18 @@ func (adt *AutoDepositTracker) StartFromApi( return fmt.Errorf("auto deposit tracker is already running") } + if startWindow == nil { + var err error + startWindow, err = adt.btContract.GetCurrentWindow() + if err != nil { + adt.logger.Error("failed to get current window", "error", err) + return err + } + // adding +2 as oracle runs two windows behind + startWindow = new(big.Int).Add(startWindow, big.NewInt(2)) + + } + eg, egCtx := errgroup.WithContext(context.Background()) egCtx, cancel := context.WithCancel(egCtx) adt.cancelFunc = cancel diff --git a/p2p/pkg/autodepositor/autodepositor_test.go b/p2p/pkg/autodepositor/autodepositor_test.go index 56e3a4fcd..1e02a08bb 100644 --- a/p2p/pkg/autodepositor/autodepositor_test.go +++ b/p2p/pkg/autodepositor/autodepositor_test.go @@ -102,7 +102,7 @@ func TestAutoDepositTracker_Start(t *testing.T) { // Start AutoDepositTracker ctx := context.Background() startWindow := big.NewInt(2) - err = adt.StartFromApi(ctx, startWindow, amount) + err = adt.Start(ctx, startWindow, amount) if err != nil { t.Fatal(err) } @@ -201,7 +201,7 @@ func TestAutoDepositTracker_Start_CancelContext(t *testing.T) { startWindow := big.NewInt(1) amount := big.NewInt(100) cancel() - err = adt.StartFromApi(ctx, startWindow, amount) + err = adt.Start(ctx, startWindow, amount) if err != context.Canceled { t.Fatalf("expected context canceled error, got %v", err) } @@ -282,7 +282,7 @@ func TestAutoDepositTracker_IsWorking(t *testing.T) { ctx := context.Background() startWindow := big.NewInt(1) amount := big.NewInt(100) - err = adt.StartFromApi(ctx, startWindow, amount) + err = adt.Start(ctx, startWindow, amount) if err != nil { t.Fatal(err) } diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index 3208c2349..933e530af 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -84,8 +84,9 @@ type Options struct { } type Node struct { - cancelFunc context.CancelFunc - closers []io.Closer + cancelFunc context.CancelFunc + closers []io.Closer + autoDeposit *autodepositor.AutoDepositTracker } func NewNode(opts *Options) (*Node, error) { @@ -448,7 +449,8 @@ func NewNode(opts *Options) (*Node, error) { ) if opts.AutodepositAmount != nil { - startables = append(startables, autoDeposit) + autoDeposit.Start(context.Background(), nil, opts.AutodepositAmount) + nd.autoDeposit = autoDeposit } bidderAPI := bidderapi.NewService( @@ -651,6 +653,9 @@ func (n *Node) Close() error { for _, c := range n.closers { err = errors.Join(err, c.Close()) } + + _, adErr := n.autoDeposit.Stop() + errors.Join(err, adErr) return err } diff --git a/p2p/pkg/rpc/bidder/service.go b/p2p/pkg/rpc/bidder/service.go index 25d078ec0..db91fa3ee 100644 --- a/p2p/pkg/rpc/bidder/service.go +++ b/p2p/pkg/rpc/bidder/service.go @@ -63,7 +63,7 @@ func NewService( } type AutoDepositTracker interface { - StartFromApi(context.Context, *big.Int, *big.Int) error + Start(context.Context, *big.Int, *big.Int) error Stop() ([]*big.Int, error) IsWorking() bool GetStatus() (map[uint64]bool, bool) @@ -348,7 +348,7 @@ func (s *Service) AutoDeposit( return nil, status.Errorf(codes.InvalidArgument, "parsing amount: %v", r.Amount) } - err = s.autoDepositTracker.StartFromApi(ctx, windowToDeposit, amount) + err = s.autoDepositTracker.Start(ctx, windowToDeposit, amount) if err != nil { return nil, status.Errorf(codes.Internal, "starting auto deposit: %v", err) } diff --git a/p2p/pkg/rpc/bidder/service_test.go b/p2p/pkg/rpc/bidder/service_test.go index 19eade82f..cfd46e80c 100644 --- a/p2p/pkg/rpc/bidder/service_test.go +++ b/p2p/pkg/rpc/bidder/service_test.go @@ -129,7 +129,7 @@ type testAutoDepositTracker struct { isWorking bool } -func (t *testAutoDepositTracker) StartFromApi(ctx context.Context, startWindow, amount *big.Int) error { +func (t *testAutoDepositTracker) Start(ctx context.Context, startWindow, amount *big.Int) error { t.mtx.Lock() defer t.mtx.Unlock()