From 85704176773093354cf4564b98c9b60bb235cc59 Mon Sep 17 00:00:00 2001 From: mrekucci Date: Thu, 8 Aug 2024 11:58:21 +0200 Subject: [PATCH] refactor: oracle l1 retry client --- oracle/pkg/l1Listener/l1Listener.go | 2 +- oracle/pkg/node/node.go | 116 ++++++++------ oracle/pkg/node/node_test.go | 226 ++++++++++++++++++++++++++++ 3 files changed, 294 insertions(+), 50 deletions(-) create mode 100644 oracle/pkg/node/node_test.go diff --git a/oracle/pkg/l1Listener/l1Listener.go b/oracle/pkg/l1Listener/l1Listener.go index c15b6b9c3..22800e638 100644 --- a/oracle/pkg/l1Listener/l1Listener.go +++ b/oracle/pkg/l1Listener/l1Listener.go @@ -29,8 +29,8 @@ type WinnerRegister interface { type EthClient interface { BlockNumber(ctx context.Context) (uint64, error) - HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) } type L1Listener struct { diff --git a/oracle/pkg/node/node.go b/oracle/pkg/node/node.go index 2787d32d1..79720a65e 100644 --- a/oracle/pkg/node/node.go +++ b/oracle/pkg/node/node.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "math/big" + "math/rand/v2" "net/url" "strings" "time" @@ -103,11 +104,22 @@ func NewNode(opts *Options) (*Node, error) { l1Client, err := ethclient.Dial(opts.L1RPCUrl) if err != nil { - nd.logger.Error("Failed to connect to the L1 Ethereum client", "error", err) + nd.logger.Error("failed to connect to the L1 Ethereum client", "error", err) return nil, err } - ctx, cancel := context.WithCancel(context.Background()) + var listenerL1Client l1Listener.EthClient = l1Client + if opts.LaggerdMode > 0 { + listenerL1Client = &laggerdL1Client{ + EthClient: listenerL1Client, + amount: opts.LaggerdMode, + } + } + listenerL1Client = &retryL1Client{ + EthClient: listenerL1Client, + logger: nd.logger, + maxRetries: 30, + } monitor := txmonitor.New( owner, @@ -118,6 +130,7 @@ func NewNode(opts *Options) (*Node, error) { 1024, ) + ctx, cancel := context.WithCancel(context.Background()) monitorClosed := monitor.Start(ctx) healthChecker.Register(health.CloseChannelHealthCheck("txmonitor", monitorClosed)) @@ -164,15 +177,6 @@ func NewNode(opts *Options) (*Node, error) { ) } - var listenerL1Client l1Listener.EthClient - - listenerL1Client = l1Client - if opts.LaggerdMode > 0 { - listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode} - } - - listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger} - blockTracker, err := blocktracker.NewBlocktrackerTransactor( opts.BlockTrackerContractAddr, settlementRPC, @@ -444,60 +448,74 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big return hdr, nil } -type infiniteRetryL1Client struct { +// errRetry is returned when retry maxRetries is exhausted. +var errRetry = errors.New("retry attempts exhausted") + +// retryL1Client retries the underlying L1Client operations up to maxRetries times. +// When maxRetries is exhausted, errRetry is returned together with all the errors from the retries. +// A random delay is introduced between each retry. +type retryL1Client struct { l1Listener.EthClient - logger *slog.Logger + + logger *slog.Logger + maxRetries int } -func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) { - var blkNum uint64 - var err error - for retries := 50; retries > 0; retries-- { - blkNum, err = i.EthClient.BlockNumber(ctx) +func (r *retryL1Client) BlockNumber(ctx context.Context) (uint64, error) { + var errs error + for i := range r.maxRetries { + bn, err := r.EthClient.BlockNumber(ctx) if err == nil { - break + r.logger.Debug("get block number succeeded", "attempt", i, "block", bn) + return bn, err + } + errs = errors.Join(errs, err) + r.logger.Warn("get block number failed", "attempt", i, "error", err) + if i < r.maxRetries-1 { + d := time.Duration(1+rand.Int64N(6)) * time.Second + r.logger.Info("get block number retry", "in", d) + time.Sleep(d) } - i.logger.Error("failed to get block number, retrying...", "error", err) - time.Sleep(2 * time.Second) - } - if err != nil { - return 0, err } - return blkNum, nil + return 0, errors.Join(errs, fmt.Errorf("get block number: %w", errRetry)) } -func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - var hdr *types.Header - var err error - for retries := 50; retries > 0; retries-- { - hdr, err = i.EthClient.HeaderByNumber(ctx, number) +func (r *retryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + var errs error + for i := range r.maxRetries { + b, err := r.EthClient.BlockByNumber(ctx, number) if err == nil { - break + r.logger.Debug("get block by number succeeded", "attempt", i, "block", b) + return b, err + } + errs = errors.Join(errs, err) + r.logger.Warn("get block by number failed", "number", number, "attempt", i, "error", err) + if i < r.maxRetries-1 { + d := time.Duration(1+rand.Int64N(6)) * time.Second + r.logger.Info("get block by number retry", "in", d) + time.Sleep(d) } - i.logger.Error("failed to get header by number, retrying...", "error", err) - time.Sleep(2 * time.Second) - } - if err != nil { - return nil, err } - return hdr, nil + return nil, errors.Join(errs, fmt.Errorf("get block by number: %w", errRetry)) } -func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - var blk *types.Block - var err error - for retries := 50; retries > 0; retries-- { - blk, err = i.EthClient.BlockByNumber(ctx, number) +func (r *retryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var errs error + for i := range r.maxRetries { + hdr, err := r.EthClient.HeaderByNumber(ctx, number) if err == nil { - break + r.logger.Debug("get header by number succeeded", "attempt", i, "header", hdr) + return hdr, err + } + errs = errors.Join(errs, err) + r.logger.Warn("get header by number failed", "number", number, "attempt", i, "error", err) + if i < r.maxRetries-1 { + d := time.Duration(1+rand.Int64N(6)) * time.Second + r.logger.Info("get header by number retry", "in", d) + time.Sleep(d) } - i.logger.Error("failed to get block by number, retrying...", "error", err) - time.Sleep(2 * time.Second) - } - if err != nil { - return nil, err } - return blk, nil + return nil, errors.Join(errs, fmt.Errorf("get header by number: %w", errRetry)) } func setBuilderMapping( diff --git a/oracle/pkg/node/node_test.go b/oracle/pkg/node/node_test.go new file mode 100644 index 000000000..731cc79ab --- /dev/null +++ b/oracle/pkg/node/node_test.go @@ -0,0 +1,226 @@ +package node + +import ( + "context" + "errors" + "io" + "log/slog" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/primev/mev-commit/oracle/pkg/l1Listener" +) + +var ( + _ l1Listener.EthClient = (*retryL1Client)(nil) + _ l1Listener.EthClient = (*MockEthClient)(nil) +) + +type MockEthClient struct { + blockNumberFn func(context.Context) (uint64, error) + blockByNumberFn func(context.Context, *big.Int) (*types.Block, error) + headerByNumberFn func(context.Context, *big.Int) (*types.Header, error) +} + +func (m *MockEthClient) BlockNumber(ctx context.Context) (uint64, error) { + return m.blockNumberFn(ctx) +} + +func (m *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return m.blockByNumberFn(ctx, number) +} + +func (m *MockEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return m.headerByNumberFn(ctx, number) +} + +func TestRetryL1Client(t *testing.T) { + discard := slog.New(slog.NewTextHandler(io.Discard, nil)) + + t.Run("BlockNumber Success", func(t *testing.T) { + t.Parallel() + + var ( + wantNumber = uint64(42) + wantCalls = 1 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + blockNumberFn: func(context.Context) (uint64, error) { + haveCalls++ + return wantNumber, nil + }, + }, + logger: discard, + maxRetries: 1, + } + + haveNumber, err := client.BlockNumber(context.Background()) + if err != nil { + t.Errorf("BlockNumber(...): unexpected error: %v", err) + } + if haveNumber != wantNumber { + t.Errorf("BlockNumber(...):\nhave number: %d\nwant number: %d", haveNumber, wantNumber) + } + if haveCalls != wantCalls { + t.Errorf("BlockNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) + + t.Run("BlockNumber Error", func(t *testing.T) { + t.Parallel() + + var ( + mockErr = errors.New("block number error") + wantCalls = 3 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + blockNumberFn: func(context.Context) (uint64, error) { + haveCalls++ + return 0, mockErr + }, + }, + logger: discard, + maxRetries: wantCalls, + } + + _, err := client.BlockNumber(context.Background()) + if haveErr, wantErr := err, errRetry; !errors.Is(haveErr, wantErr) { + t.Errorf("BlockNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveErr, wantErr := err, mockErr; !errors.Is(haveErr, wantErr) { + t.Errorf("BlockNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveCalls != wantCalls { + t.Errorf("BlockNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) + + t.Run("BlockByNumber Success", func(t *testing.T) { + t.Parallel() + + var ( + wantNumber = big.NewInt(42) + wantCalls = 1 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + blockByNumberFn: func(context.Context, *big.Int) (*types.Block, error) { + haveCalls++ + return types.NewBlock(&types.Header{Number: wantNumber}, nil, nil, nil, nil), nil + }, + }, + logger: discard, + maxRetries: 1, + } + + haveBlock, err := client.BlockByNumber(context.Background(), wantNumber) + if err != nil { + t.Errorf("BlockByNumber(...): unexpected error: %v", err) + } + if haveBlock.Number().Cmp(wantNumber) != 0 { + t.Errorf("BlockByNumber(...):\nhave number: %v\nwant number: %v", haveBlock.Number(), wantNumber) + } + if haveCalls != wantCalls { + t.Errorf("BlockByNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) + + t.Run("BlockByNumber Error", func(t *testing.T) { + t.Parallel() + + var ( + mockErr = errors.New("block by number error") + wantCalls = 3 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + blockByNumberFn: func(context.Context, *big.Int) (*types.Block, error) { + haveCalls++ + return nil, mockErr + }, + }, + logger: discard, + maxRetries: wantCalls, + } + + _, err := client.BlockByNumber(context.Background(), big.NewInt(42)) + if haveErr, wantErr := err, errRetry; !errors.Is(haveErr, wantErr) { + t.Errorf("BlockByNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveErr, wantErr := err, mockErr; !errors.Is(haveErr, wantErr) { + t.Errorf("BlockByNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveCalls != wantCalls { + t.Errorf("BlockByNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) + + t.Run("HeaderByNumber Success", func(t *testing.T) { + t.Parallel() + + var ( + wantNumber = big.NewInt(42) + wantCalls = 1 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + headerByNumberFn: func(context.Context, *big.Int) (*types.Header, error) { + haveCalls++ + return &types.Header{Number: wantNumber}, nil + }, + }, + logger: discard, + maxRetries: 1, + } + + haveHeader, err := client.HeaderByNumber(context.Background(), wantNumber) + if err != nil { + t.Errorf("HeaderByNumber(...): unexpected error: %v", err) + } + if haveHeader.Number.Cmp(wantNumber) != 0 { + t.Errorf("HeaderByNumber(...):\nhave number: %v\nwant number: %v", haveHeader.Number, wantNumber) + } + if haveCalls != wantCalls { + t.Errorf("HeaderByNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) + + t.Run("HeaderByNumber Error", func(t *testing.T) { + t.Parallel() + + var ( + mockErr = errors.New("header by number error") + wantCalls = 3 + haveCalls = 0 + ) + client := &retryL1Client{ + EthClient: &MockEthClient{ + headerByNumberFn: func(context.Context, *big.Int) (*types.Header, error) { + haveCalls++ + return nil, mockErr + }, + }, + logger: discard, + maxRetries: wantCalls, + } + + _, err := client.HeaderByNumber(context.Background(), big.NewInt(42)) + if haveErr, wantErr := err, errRetry; !errors.Is(haveErr, wantErr) { + t.Errorf("HeaderByNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveErr, wantErr := err, mockErr; !errors.Is(haveErr, wantErr) { + t.Errorf("HeaderByNumber(...):\nhave error: %v\nwant error: %v", haveErr, wantErr) + } + if haveCalls != wantCalls { + t.Errorf("HeaderByNumber(...):\nhave calls: %d\nwant calls: %d", haveCalls, wantCalls) + } + }) +}