Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry logic #44

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/babylon-staking-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
log.Fatal().Err(err).Msg("error while creating btc client")
}

bbnClient := bbnclient.NewBbnClient(&cfg.Bbn)
bbnClient := bbnclient.NewBBNClient(&cfg.BBN)

btcNotifier, err := btcclient.NewBTCNotifier(
&cfg.BTC,
Expand Down
2 changes: 2 additions & 0 deletions config/config-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ btc:
bbn:
rpc-addr: https://rpc.devnet.babylonlabs.io:443
timeout: 30s
maxretrytimes: 5
retryinterval: 500ms
poller:
param-polling-interval: 60s
expiry-checker-polling-interval: 10s
Expand Down
2 changes: 2 additions & 0 deletions config/config-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ btc:
bbn:
rpc-addr: https://rpc.devnet.babylonlabs.io:443
timeout: 30s
maxretrytimes: 5
retryinterval: 500ms
poller:
param-polling-interval: 10s
expiry-checker-polling-interval: 10s
Expand Down
183 changes: 87 additions & 96 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,166 +3,157 @@ package bbnclient
import (
"context"
"fmt"
"net/http"
"strings"
"time"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
"github.com/babylonlabs-io/babylon/client/config"
"github.com/avast/retry-go/v4"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/config"
bbncfg "github.com/babylonlabs-io/babylon/client/config"
"github.com/babylonlabs-io/babylon/client/query"
bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types"
btcctypes "github.com/babylonlabs-io/babylon/x/btccheckpoint/types"
btcstakingtypes "github.com/babylonlabs-io/babylon/x/btcstaking/types"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/rs/zerolog/log"
)

const (
// Backoff parameters for retries for getting BBN block result
initialBackoff = 500 * time.Millisecond // Start with 500ms
backoffFactor = 2 // Exponential backoff factor
maxRetries = 10 // 8 minutes in worst case
)

type BbnClient struct {
type BBNClient struct {
queryClient *query.QueryClient
cfg *config.BBNConfig
}

func NewBbnClient(cfg *config.BabylonQueryConfig) BbnInterface {
queryClient, err := query.New(cfg)
func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
bbnQueryCfg := &bbncfg.BabylonQueryConfig{
RPCAddr: cfg.RPCAddr,
Timeout: cfg.Timeout,
}

queryClient, err := query.New(bbnQueryCfg)
if err != nil {
log.Fatal().Err(err).Msg("error while creating BBN query client")
}
return &BbnClient{queryClient}
return &BBNClient{queryClient, cfg}
}

func (c *BbnClient) GetLatestBlockNumber(ctx context.Context) (int64, *types.Error) {
status, err := c.queryClient.RPCClient.Status(ctx)
func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) {
callForStatus := func() (*ctypes.ResultStatus, error) {
status, err := c.queryClient.RPCClient.Status(ctx)
if err != nil {
return nil, err
}
return status, nil
}

status, err := clientCallWithRetry(callForStatus, c.cfg)
if err != nil {
return 0, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Sprintf("failed to get latest block number by fetching status: %s", err.Error()),
)
return 0, fmt.Errorf("failed to get latest block number by fetching status: %w", err)
}
return status.SyncInfo.LatestBlockHeight, nil
}

func (c *BbnClient) GetCheckpointParams(ctx context.Context) (*CheckpointParams, *types.Error) {
params, err := c.queryClient.BTCCheckpointParams()
func (c *BBNClient) GetCheckpointParams(ctx context.Context) (*CheckpointParams, error) {
callForCheckpointParams := func() (*btcctypes.QueryParamsResponse, error) {
params, err := c.queryClient.BTCCheckpointParams()
if err != nil {
return nil, err
}
return params, nil
}

params, err := clientCallWithRetry(callForCheckpointParams, c.cfg)
if err != nil {
return nil, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.ClientRequestError,
fmt.Sprintf("failed to get checkpoint params: %s", err.Error()),
)
return nil, err
}
if err := params.Params.Validate(); err != nil {
return nil, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.ValidationError,
fmt.Sprintf("failed to validate checkpoint params: %s", err.Error()),
)
return nil, err
}
return FromBbnCheckpointParams(params.Params), nil
}

func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, *types.Error) {
allParams := make(map[uint32]*StakingParams) // Map to store versioned staking parameters
func (c *BBNClient) GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, error) {
allParams := make(map[uint32]*StakingParams)
version := uint32(0)

for {
// First try without retry to check for ErrParamsNotFound
params, err := c.queryClient.BTCStakingParamsByVersion(version)
if err != nil {
if strings.Contains(err.Error(), bbntypes.ErrParamsNotFound.Error()) {
// Break the loop if an error occurs (indicating no more versions)
break
if strings.Contains(err.Error(), btcstakingtypes.ErrParamsNotFound.Error()) {
break // Exit loop if params not found
}

// Only retry for other errors
callForStakingParams := func() (*btcstakingtypes.QueryParamsByVersionResponse, error) {
return c.queryClient.BTCStakingParamsByVersion(version)
}

params, err = clientCallWithRetry(callForStakingParams, c.cfg)
if err != nil {
return nil, fmt.Errorf("failed to get staking params for version %d: %w", version, err)
}
return nil, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.ClientRequestError,
fmt.Sprintf("failed to get staking params for version %d: %s", version, err.Error()),
)
}

if err := params.Params.Validate(); err != nil {
return nil, types.NewErrorWithMsg(
http.StatusInternalServerError,
types.ValidationError,
fmt.Sprintf("failed to validate staking params for version %d: %s", version, err.Error()),
)
return nil, fmt.Errorf("failed to validate staking params for version %d: %w", version, err)
}

allParams[version] = FromBbnStakingParams(params.Params)
version++
}

if len(allParams) == 0 {
return nil, types.NewErrorWithMsg(
http.StatusNotFound,
types.NotFound,
"no staking params found",
)
return nil, fmt.Errorf("no staking params found")
}

return allParams, nil
}

// GetBlockResultsWithRetry retries the `getBlockResults` method with exponential backoff
// when the block is not yet available.
func (c *BbnClient) GetBlockResultsWithRetry(
func (c *BBNClient) GetBlockResults(
ctx context.Context, blockHeight *int64,
) (*ctypes.ResultBlockResults, *types.Error) {
backoff := initialBackoff
var resp *ctypes.ResultBlockResults
var err *types.Error

for i := 0; i < maxRetries; i++ {
resp, err = c.getBlockResults(ctx, blockHeight)
if err == nil {
return resp, nil
) (*ctypes.ResultBlockResults, error) {
callForBlockResults := func() (*ctypes.ResultBlockResults, error) {
resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight)
if err != nil {
return nil, err
}
return resp, nil
}

if strings.Contains(
err.Err.Error(),
"must be less than or equal to the current blockchain height",
) {
log.Debug().
Str("block_height", fmt.Sprintf("%d", *blockHeight)).
Str("backoff", backoff.String()).
Msg("Block not yet available, retrying...")
time.Sleep(backoff)
backoff *= backoffFactor
continue
}
blockResults, err := clientCallWithRetry(callForBlockResults, c.cfg)
if err != nil {
return nil, err
}

// If we exhaust retries, return a not found error
return nil, types.NewErrorWithMsg(
http.StatusNotFound,
types.NotFound,
fmt.Sprintf("Block height %d not found after retries", *blockHeight),
)
return blockResults, nil
}

func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
}

func (c *BbnClient) UnsubscribeAll(subscriber string) error {
func (c *BBNClient) UnsubscribeAll(subscriber string) error {
return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
}

func (c *BbnClient) IsRunning() bool {
func (c *BBNClient) IsRunning() bool {
return c.queryClient.RPCClient.IsRunning()
}

func (c *BbnClient) Start() error {
func (c *BBNClient) Start() error {
return c.queryClient.RPCClient.Start()
}

func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) {
resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight)
func clientCallWithRetry[T any](
call retry.RetryableFuncWithData[*T], cfg *config.BBNConfig,
) (*T, error) {
result, err := retry.DoWithData(call, retry.Attempts(cfg.MaxRetryTimes), retry.Delay(cfg.RetryInterval), retry.LastErrorOnly(true),
retry.OnRetry(func(n uint, err error) {
log.Debug().
Uint("attempt", n+1).
Uint("max_attempts", cfg.MaxRetryTimes).
Err(err).
Msg("failed to call the RPC client")
}))

if err != nil {
return nil, types.NewErrorWithMsg(
http.StatusInternalServerError, types.InternalServiceError, err.Error(),
)
return nil, err
}
return resp, nil
return result, nil
}
11 changes: 4 additions & 7 deletions internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package bbnclient
import (
"context"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
)

type BbnInterface interface {
GetCheckpointParams(ctx context.Context) (*CheckpointParams, *types.Error)
GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, *types.Error)
GetLatestBlockNumber(ctx context.Context) (int64, *types.Error)
GetBlockResultsWithRetry(
ctx context.Context, blockHeight *int64,
) (*ctypes.ResultBlockResults, *types.Error)
GetCheckpointParams(ctx context.Context) (*CheckpointParams, error)
GetAllStakingParams(ctx context.Context) (map[uint32]*StakingParams, error)
GetLatestBlockNumber(ctx context.Context) (int64, error)
GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Expand Down
34 changes: 34 additions & 0 deletions internal/config/bbn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

import (
"fmt"
"net/url"
"time"
)

type BBNConfig struct {
RPCAddr string `mapstructure:"rpc-addr"`
Timeout time.Duration `mapstructure:"timeout"`
MaxRetryTimes uint `mapstructure:"maxretrytimes"`
RetryInterval time.Duration `mapstructure:"retryinterval"`
}

func (cfg *BBNConfig) Validate() error {
if _, err := url.Parse(cfg.RPCAddr); err != nil {
return fmt.Errorf("cfg.RPCAddr is not correctly formatted: %w", err)
}

if cfg.Timeout <= 0 {
return fmt.Errorf("cfg.Timeout must be positive")
}

if cfg.MaxRetryTimes <= 0 {
return fmt.Errorf("cfg.MaxRetryTimes must be positive")
}

if cfg.RetryInterval <= 0 {
return fmt.Errorf("cfg.RetryInterval must be positive")
}

return nil
}
20 changes: 8 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ import (
"os"
"strings"

bbnconfig "github.com/babylonlabs-io/babylon/client/config"
queue "github.com/babylonlabs-io/staking-queue-client/config"
"github.com/spf13/viper"
)

type Config struct {
Db DbConfig `mapstructure:"db"`
BTC BTCConfig `mapstructure:"btc"`
Bbn bbnconfig.BabylonQueryConfig `mapstructure:"bbn"`
Poller PollerConfig `mapstructure:"poller"`
Queue queue.QueueConfig `mapstructure:"queue"`
Metrics MetricsConfig `mapstructure:"metrics"`
Db DbConfig `mapstructure:"db"`
BTC BTCConfig `mapstructure:"btc"`
BBN BBNConfig `mapstructure:"bbn"`
Poller PollerConfig `mapstructure:"poller"`
Queue queue.QueueConfig `mapstructure:"queue"`
Metrics MetricsConfig `mapstructure:"metrics"`
}

func (cfg *Config) Validate() error {
if err := cfg.Bbn.Validate(); err != nil {
if err := cfg.BBN.Validate(); err != nil {
return err
}

if err := cfg.Db.Validate(); err != nil {
return err
}
Expand All @@ -39,10 +39,6 @@ func (cfg *Config) Validate() error {
return err
}

if err := cfg.Bbn.Validate(); err != nil {
return err
}

if err := cfg.Poller.Validate(); err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ func (s *Service) getEventsFromBlock(
ctx context.Context, blockHeight int64,
) ([]BbnEvent, *types.Error) {
events := make([]BbnEvent, 0)
blockResult, err := s.bbn.GetBlockResultsWithRetry(ctx, &blockHeight)
blockResult, err := s.bbn.GetBlockResults(ctx, &blockHeight)
if err != nil {
return nil, err
return nil, types.NewError(
http.StatusInternalServerError,
types.ClientRequestError,
fmt.Errorf("failed to get block results: %w", err),
)
}
// Append transaction-level events
for _, txResult := range blockResult.TxsResults {
Expand Down
Loading
Loading