diff --git a/cmd/babylon-staking-indexer/main.go b/cmd/babylon-staking-indexer/main.go index be7b244..8956aac 100644 --- a/cmd/babylon-staking-indexer/main.go +++ b/cmd/babylon-staking-indexer/main.go @@ -54,7 +54,7 @@ func main() { log.Fatal().Err(err).Msg("error while creating btc client") } - bbnClient := bbnclient.NewBbnClient(&cfg.Bbn) + bbnClient := bbnclient.NewBBNClient(&cfg.BBN) btcNotifier, err := btcclient.NewBTCNotifier( &cfg.BTC, diff --git a/config/config-docker.yml b/config/config-docker.yml index 32581fb..d4530b2 100644 --- a/config/config-docker.yml +++ b/config/config-docker.yml @@ -18,6 +18,8 @@ btc: bbn: rpc-addr: https://rpc.devnet.babylonlabs.io:443 timeout: 30s + maxretrytimes: 5 + retryinterval: 500ms poller: param-polling-interval: 60s expiry-checker-polling-interval: 10s diff --git a/config/config-local.yml b/config/config-local.yml index a4e1004..0b568e6 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -18,6 +18,8 @@ btc: bbn: rpc-addr: https://rpc.devnet.babylonlabs.io:443 timeout: 30s + maxretrytimes: 5 + retryinterval: 500ms poller: param-polling-interval: 10s expiry-checker-polling-interval: 10s diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index d0e8aab..4416cc6 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -3,166 +3,157 @@ package bbnclient import ( "context" "fmt" - "net/http" "strings" - "time" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" - "github.com/babylonlabs-io/babylon/client/config" + "github.com/avast/retry-go/v4" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" + bbncfg "github.com/babylonlabs-io/babylon/client/config" "github.com/babylonlabs-io/babylon/client/query" - bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" + btcctypes "github.com/babylonlabs-io/babylon/x/btccheckpoint/types" + btcstakingtypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/rs/zerolog/log" ) -const ( - // Backoff parameters for retries for getting BBN block result - initialBackoff = 500 * time.Millisecond // Start with 500ms - backoffFactor = 2 // Exponential backoff factor - maxRetries = 10 // 8 minutes in worst case -) - -type BbnClient struct { +type BBNClient struct { queryClient *query.QueryClient + cfg *config.BBNConfig } -func NewBbnClient(cfg *config.BabylonQueryConfig) BbnInterface { - queryClient, err := query.New(cfg) +func NewBBNClient(cfg *config.BBNConfig) BbnInterface { + bbnQueryCfg := &bbncfg.BabylonQueryConfig{ + RPCAddr: cfg.RPCAddr, + Timeout: cfg.Timeout, + } + + queryClient, err := query.New(bbnQueryCfg) if err != nil { log.Fatal().Err(err).Msg("error while creating BBN query client") } - return &BbnClient{queryClient} + return &BBNClient{queryClient, cfg} } -func (c *BbnClient) GetLatestBlockNumber(ctx context.Context) (int64, *types.Error) { - status, err := c.queryClient.RPCClient.Status(ctx) +func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) { + callForStatus := func() (*ctypes.ResultStatus, error) { + status, err := c.queryClient.RPCClient.Status(ctx) + if err != nil { + return nil, err + } + return status, nil + } + + status, err := clientCallWithRetry(callForStatus, c.cfg) if err != nil { - return 0, types.NewErrorWithMsg( - http.StatusInternalServerError, - types.InternalServiceError, - fmt.Sprintf("failed to get latest block number by fetching status: %s", err.Error()), - ) + return 0, fmt.Errorf("failed to get latest block number by fetching status: %w", err) } return status.SyncInfo.LatestBlockHeight, nil } -func (c *BbnClient) GetCheckpointParams(ctx context.Context) (*CheckpointParams, *types.Error) { - params, err := c.queryClient.BTCCheckpointParams() +func (c *BBNClient) GetCheckpointParams(ctx context.Context) (*CheckpointParams, error) { + callForCheckpointParams := func() (*btcctypes.QueryParamsResponse, error) { + params, err := c.queryClient.BTCCheckpointParams() + if err != nil { + return nil, err + } + return params, nil + } + + params, err := clientCallWithRetry(callForCheckpointParams, c.cfg) if err != nil { - return nil, types.NewErrorWithMsg( - http.StatusInternalServerError, - types.ClientRequestError, - fmt.Sprintf("failed to get checkpoint params: %s", err.Error()), - ) + return nil, err } if err := params.Params.Validate(); err != nil { - return nil, types.NewErrorWithMsg( - http.StatusInternalServerError, - types.ValidationError, - fmt.Sprintf("failed to validate checkpoint params: %s", err.Error()), - ) + return nil, err } return FromBbnCheckpointParams(params.Params), nil } -func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, *types.Error) { - allParams := make(map[uint32]*StakingParams) // Map to store versioned staking parameters +func (c *BBNClient) GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, error) { + allParams := make(map[uint32]*StakingParams) version := uint32(0) for { + // First try without retry to check for ErrParamsNotFound params, err := c.queryClient.BTCStakingParamsByVersion(version) if err != nil { - if strings.Contains(err.Error(), bbntypes.ErrParamsNotFound.Error()) { - // Break the loop if an error occurs (indicating no more versions) - break + if strings.Contains(err.Error(), btcstakingtypes.ErrParamsNotFound.Error()) { + break // Exit loop if params not found + } + + // Only retry for other errors + callForStakingParams := func() (*btcstakingtypes.QueryParamsByVersionResponse, error) { + return c.queryClient.BTCStakingParamsByVersion(version) + } + + params, err = clientCallWithRetry(callForStakingParams, c.cfg) + if err != nil { + return nil, fmt.Errorf("failed to get staking params for version %d: %w", version, err) } - return nil, types.NewErrorWithMsg( - http.StatusInternalServerError, - types.ClientRequestError, - fmt.Sprintf("failed to get staking params for version %d: %s", version, err.Error()), - ) } + if err := params.Params.Validate(); err != nil { - return nil, types.NewErrorWithMsg( - http.StatusInternalServerError, - types.ValidationError, - fmt.Sprintf("failed to validate staking params for version %d: %s", version, err.Error()), - ) + return nil, fmt.Errorf("failed to validate staking params for version %d: %w", version, err) } + allParams[version] = FromBbnStakingParams(params.Params) version++ } + if len(allParams) == 0 { - return nil, types.NewErrorWithMsg( - http.StatusNotFound, - types.NotFound, - "no staking params found", - ) + return nil, fmt.Errorf("no staking params found") } return allParams, nil } -// GetBlockResultsWithRetry retries the `getBlockResults` method with exponential backoff -// when the block is not yet available. -func (c *BbnClient) GetBlockResultsWithRetry( +func (c *BBNClient) GetBlockResults( ctx context.Context, blockHeight *int64, -) (*ctypes.ResultBlockResults, *types.Error) { - backoff := initialBackoff - var resp *ctypes.ResultBlockResults - var err *types.Error - - for i := 0; i < maxRetries; i++ { - resp, err = c.getBlockResults(ctx, blockHeight) - if err == nil { - return resp, nil +) (*ctypes.ResultBlockResults, error) { + callForBlockResults := func() (*ctypes.ResultBlockResults, error) { + resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight) + if err != nil { + return nil, err } + return resp, nil + } - if strings.Contains( - err.Err.Error(), - "must be less than or equal to the current blockchain height", - ) { - log.Debug(). - Str("block_height", fmt.Sprintf("%d", *blockHeight)). - Str("backoff", backoff.String()). - Msg("Block not yet available, retrying...") - time.Sleep(backoff) - backoff *= backoffFactor - continue - } + blockResults, err := clientCallWithRetry(callForBlockResults, c.cfg) + if err != nil { return nil, err } - - // If we exhaust retries, return a not found error - return nil, types.NewErrorWithMsg( - http.StatusNotFound, - types.NotFound, - fmt.Sprintf("Block height %d not found after retries", *blockHeight), - ) + return blockResults, nil } -func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { +func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...) } -func (c *BbnClient) UnsubscribeAll(subscriber string) error { +func (c *BBNClient) UnsubscribeAll(subscriber string) error { return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber) } -func (c *BbnClient) IsRunning() bool { +func (c *BBNClient) IsRunning() bool { return c.queryClient.RPCClient.IsRunning() } -func (c *BbnClient) Start() error { +func (c *BBNClient) Start() error { return c.queryClient.RPCClient.Start() } -func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) { - resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight) +func clientCallWithRetry[T any]( + call retry.RetryableFuncWithData[*T], cfg *config.BBNConfig, +) (*T, error) { + result, err := retry.DoWithData(call, retry.Attempts(cfg.MaxRetryTimes), retry.Delay(cfg.RetryInterval), retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + log.Debug(). + Uint("attempt", n+1). + Uint("max_attempts", cfg.MaxRetryTimes). + Err(err). + Msg("failed to call the RPC client") + })) + if err != nil { - return nil, types.NewErrorWithMsg( - http.StatusInternalServerError, types.InternalServiceError, err.Error(), - ) + return nil, err } - return resp, nil + return result, nil } diff --git a/internal/clients/bbnclient/interface.go b/internal/clients/bbnclient/interface.go index f53c176..f189621 100644 --- a/internal/clients/bbnclient/interface.go +++ b/internal/clients/bbnclient/interface.go @@ -3,17 +3,14 @@ package bbnclient import ( "context" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" ctypes "github.com/cometbft/cometbft/rpc/core/types" ) type BbnInterface interface { - GetCheckpointParams(ctx context.Context) (*CheckpointParams, *types.Error) - GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, *types.Error) - GetLatestBlockNumber(ctx context.Context) (int64, *types.Error) - GetBlockResultsWithRetry( - ctx context.Context, blockHeight *int64, - ) (*ctypes.ResultBlockResults, *types.Error) + GetCheckpointParams(ctx context.Context) (*CheckpointParams, error) + GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, error) + GetLatestBlockNumber(ctx context.Context) (int64, error) + GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) UnsubscribeAll(subscriber string) error IsRunning() bool diff --git a/internal/config/bbn.go b/internal/config/bbn.go new file mode 100644 index 0000000..eb901dc --- /dev/null +++ b/internal/config/bbn.go @@ -0,0 +1,34 @@ +package config + +import ( + "fmt" + "net/url" + "time" +) + +type BBNConfig struct { + RPCAddr string `mapstructure:"rpc-addr"` + Timeout time.Duration `mapstructure:"timeout"` + MaxRetryTimes uint `mapstructure:"maxretrytimes"` + RetryInterval time.Duration `mapstructure:"retryinterval"` +} + +func (cfg *BBNConfig) Validate() error { + if _, err := url.Parse(cfg.RPCAddr); err != nil { + return fmt.Errorf("cfg.RPCAddr is not correctly formatted: %w", err) + } + + if cfg.Timeout <= 0 { + return fmt.Errorf("cfg.Timeout must be positive") + } + + if cfg.MaxRetryTimes <= 0 { + return fmt.Errorf("cfg.MaxRetryTimes must be positive") + } + + if cfg.RetryInterval <= 0 { + return fmt.Errorf("cfg.RetryInterval must be positive") + } + + return nil +} diff --git a/internal/config/config.go b/internal/config/config.go index b3eee3c..399746e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,24 +5,24 @@ import ( "os" "strings" - bbnconfig "github.com/babylonlabs-io/babylon/client/config" queue "github.com/babylonlabs-io/staking-queue-client/config" "github.com/spf13/viper" ) type Config struct { - Db DbConfig `mapstructure:"db"` - BTC BTCConfig `mapstructure:"btc"` - Bbn bbnconfig.BabylonQueryConfig `mapstructure:"bbn"` - Poller PollerConfig `mapstructure:"poller"` - Queue queue.QueueConfig `mapstructure:"queue"` - Metrics MetricsConfig `mapstructure:"metrics"` + Db DbConfig `mapstructure:"db"` + BTC BTCConfig `mapstructure:"btc"` + BBN BBNConfig `mapstructure:"bbn"` + Poller PollerConfig `mapstructure:"poller"` + Queue queue.QueueConfig `mapstructure:"queue"` + Metrics MetricsConfig `mapstructure:"metrics"` } func (cfg *Config) Validate() error { - if err := cfg.Bbn.Validate(); err != nil { + if err := cfg.BBN.Validate(); err != nil { return err } + if err := cfg.Db.Validate(); err != nil { return err } @@ -39,10 +39,6 @@ func (cfg *Config) Validate() error { return err } - if err := cfg.Bbn.Validate(); err != nil { - return err - } - if err := cfg.Poller.Validate(); err != nil { return err } diff --git a/internal/services/bootstrap.go b/internal/services/bootstrap.go index bd5b926..2d306ab 100644 --- a/internal/services/bootstrap.go +++ b/internal/services/bootstrap.go @@ -61,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { } // Process blocks from lastProcessedHeight + 1 to latestHeight - for i := lastProcessedHeight; i <= uint64(latestHeight); i++ { + for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ { select { case <-ctx.Done(): return types.NewError( @@ -105,9 +105,13 @@ func (s *Service) getEventsFromBlock( ctx context.Context, blockHeight int64, ) ([]BbnEvent, *types.Error) { events := make([]BbnEvent, 0) - blockResult, err := s.bbn.GetBlockResultsWithRetry(ctx, &blockHeight) + blockResult, err := s.bbn.GetBlockResults(ctx, &blockHeight) if err != nil { - return nil, err + return nil, types.NewError( + http.StatusInternalServerError, + types.ClientRequestError, + fmt.Errorf("failed to get block results: %w", err), + ) } // Append transaction-level events for _, txResult := range blockResult.TxsResults { diff --git a/tests/mocks/mock_bbn_client.go b/tests/mocks/mock_bbn_client.go index 2d67d0f..efbe075 100644 --- a/tests/mocks/mock_bbn_client.go +++ b/tests/mocks/mock_bbn_client.go @@ -10,8 +10,6 @@ import ( coretypes "github.com/cometbft/cometbft/rpc/core/types" mock "github.com/stretchr/testify/mock" - - types "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" ) // BbnInterface is an autogenerated mock type for the BbnInterface type @@ -20,7 +18,7 @@ type BbnInterface struct { } // GetAllStakingParams provides a mock function with given fields: ctx -func (_m *BbnInterface) GetAllStakingParams(ctx context.Context) (map[uint32]*bbnclient.StakingParams, *types.Error) { +func (_m *BbnInterface) GetAllStakingParams(ctx context.Context) (map[uint32]*bbnclient.StakingParams, error) { ret := _m.Called(ctx) if len(ret) == 0 { @@ -28,8 +26,8 @@ func (_m *BbnInterface) GetAllStakingParams(ctx context.Context) (map[uint32]*bb } var r0 map[uint32]*bbnclient.StakingParams - var r1 *types.Error - if rf, ok := ret.Get(0).(func(context.Context) (map[uint32]*bbnclient.StakingParams, *types.Error)); ok { + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[uint32]*bbnclient.StakingParams, error)); ok { return rf(ctx) } if rf, ok := ret.Get(0).(func(context.Context) map[uint32]*bbnclient.StakingParams); ok { @@ -40,28 +38,26 @@ func (_m *BbnInterface) GetAllStakingParams(ctx context.Context) (map[uint32]*bb } } - if rf, ok := ret.Get(1).(func(context.Context) *types.Error); ok { + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*types.Error) - } + r1 = ret.Error(1) } return r0, r1 } -// GetBlockResultsWithRetry provides a mock function with given fields: ctx, blockHeight -func (_m *BbnInterface) GetBlockResultsWithRetry(ctx context.Context, blockHeight *int64) (*coretypes.ResultBlockResults, *types.Error) { +// GetBlockResults provides a mock function with given fields: ctx, blockHeight +func (_m *BbnInterface) GetBlockResults(ctx context.Context, blockHeight *int64) (*coretypes.ResultBlockResults, error) { ret := _m.Called(ctx, blockHeight) if len(ret) == 0 { - panic("no return value specified for GetBlockResultsWithRetry") + panic("no return value specified for GetBlockResults") } var r0 *coretypes.ResultBlockResults - var r1 *types.Error - if rf, ok := ret.Get(0).(func(context.Context, *int64) (*coretypes.ResultBlockResults, *types.Error)); ok { + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *int64) (*coretypes.ResultBlockResults, error)); ok { return rf(ctx, blockHeight) } if rf, ok := ret.Get(0).(func(context.Context, *int64) *coretypes.ResultBlockResults); ok { @@ -72,19 +68,17 @@ func (_m *BbnInterface) GetBlockResultsWithRetry(ctx context.Context, blockHeigh } } - if rf, ok := ret.Get(1).(func(context.Context, *int64) *types.Error); ok { + if rf, ok := ret.Get(1).(func(context.Context, *int64) error); ok { r1 = rf(ctx, blockHeight) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*types.Error) - } + r1 = ret.Error(1) } return r0, r1 } // GetCheckpointParams provides a mock function with given fields: ctx -func (_m *BbnInterface) GetCheckpointParams(ctx context.Context) (*bbnclient.CheckpointParams, *types.Error) { +func (_m *BbnInterface) GetCheckpointParams(ctx context.Context) (*bbnclient.CheckpointParams, error) { ret := _m.Called(ctx) if len(ret) == 0 { @@ -92,8 +86,8 @@ func (_m *BbnInterface) GetCheckpointParams(ctx context.Context) (*bbnclient.Che } var r0 *bbnclient.CheckpointParams - var r1 *types.Error - if rf, ok := ret.Get(0).(func(context.Context) (*bbnclient.CheckpointParams, *types.Error)); ok { + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*bbnclient.CheckpointParams, error)); ok { return rf(ctx) } if rf, ok := ret.Get(0).(func(context.Context) *bbnclient.CheckpointParams); ok { @@ -104,19 +98,17 @@ func (_m *BbnInterface) GetCheckpointParams(ctx context.Context) (*bbnclient.Che } } - if rf, ok := ret.Get(1).(func(context.Context) *types.Error); ok { + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*types.Error) - } + r1 = ret.Error(1) } return r0, r1 } // GetLatestBlockNumber provides a mock function with given fields: ctx -func (_m *BbnInterface) GetLatestBlockNumber(ctx context.Context) (int64, *types.Error) { +func (_m *BbnInterface) GetLatestBlockNumber(ctx context.Context) (int64, error) { ret := _m.Called(ctx) if len(ret) == 0 { @@ -124,8 +116,8 @@ func (_m *BbnInterface) GetLatestBlockNumber(ctx context.Context) (int64, *types } var r0 int64 - var r1 *types.Error - if rf, ok := ret.Get(0).(func(context.Context) (int64, *types.Error)); ok { + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { return rf(ctx) } if rf, ok := ret.Get(0).(func(context.Context) int64); ok { @@ -134,12 +126,10 @@ func (_m *BbnInterface) GetLatestBlockNumber(ctx context.Context) (int64, *types r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(context.Context) *types.Error); ok { + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*types.Error) - } + r1 = ret.Error(1) } return r0, r1