Skip to content

Commit

Permalink
Retry subscription if no new events (#131)
Browse files Browse the repository at this point in the history
* fix: resubscribe to ws if not responding
  • Loading branch information
jrwbabylonlab authored Jan 22, 2025
1 parent fab471c commit 4106651
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 8 deletions.
90 changes: 87 additions & 3 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/config"
Expand All @@ -16,6 +18,7 @@ import (
)

type BBNClient struct {
wg sync.WaitGroup
queryClient *query.QueryClient
cfg *config.BBNConfig
}
Expand All @@ -30,7 +33,10 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface {
if err != nil {
log.Fatal().Err(err).Msg("error while creating BBN query client")
}
return &BBNClient{queryClient, cfg}
return &BBNClient{
queryClient: queryClient,
cfg: cfg,
}
}

func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -140,8 +146,86 @@ func (c *BBNClient) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.R
return block, nil
}

func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
func (c *BBNClient) Subscribe(
subscriber, query string,
healthCheckInterval time.Duration,
maxEventWaitInterval time.Duration,
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error) {
eventChan := make(chan ctypes.ResultEvent)

subscribe := func() (<-chan ctypes.ResultEvent, error) {
newChan, err := c.queryClient.RPCClient.Subscribe(
context.Background(),
subscriber,
query,
outCapacity...,
)
if err != nil {
return nil, fmt.Errorf(
"failed to subscribe babylon events for query %s: %w", query, err,
)
}
return newChan, nil
}

// Initial subscription
rawEventChan, err := subscribe()
if err != nil {
close(eventChan)
return nil, err
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer close(eventChan)
timeoutTicker := time.NewTicker(healthCheckInterval)
defer timeoutTicker.Stop()
lastEventTime := time.Now()

for {
select {
case event, ok := <-rawEventChan:
if !ok {
log.Fatal().
Str("subscriber", subscriber).
Str("query", query).
Msg("Subscription channel closed, this shall not happen")
}
lastEventTime = time.Now()
eventChan <- event
case <-timeoutTicker.C:
if time.Since(lastEventTime) > maxEventWaitInterval {
log.Error().
Str("subscriber", subscriber).
Str("query", query).
Msg("No events received, attempting to resubscribe")

if err := c.queryClient.RPCClient.Unsubscribe(
context.Background(),
subscriber,
query,
); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe babylon events")
}

// Create new subscription
newEventChan, err := subscribe()
if err != nil {
log.Error().Err(err).Msg("Failed to resubscribe babylon events")
continue
}

// Replace the old channel with the new one
rawEventChan = newEventChan
// reset last event time
lastEventTime = time.Now()
}
}
}
}()

return eventChan, nil
}

func (c *BBNClient) UnsubscribeAll(subscriber string) error {
Expand Down
8 changes: 7 additions & 1 deletion internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bbnclient

import (
"context"
"time"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
)
Expand All @@ -13,7 +14,12 @@ type BbnInterface interface {
GetLatestBlockNumber(ctx context.Context) (int64, error)
GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlock, error)
GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
Subscribe(
subscriber, query string,
healthCheckInterval time.Duration,
maxEventWaitInterval time.Duration,
outCapacity ...int,
) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Start() error
Expand Down
27 changes: 23 additions & 4 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,38 @@ package services

import (
"context"
"time"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
ctypes "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

const (
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
outCapacity = 100
subscriptionHealthCheckInterval = 1 * time.Minute
maxEventWaitInterval = 1 * time.Minute
)

func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if !s.bbn.IsRunning() {
log.Fatal().Msg("BBN client is not running")
}

eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery)
// Subscribe to new block events but only wait for 5 minutes for events
// if nothing come through within 5 minutes, the underlying subscription will
// be resubscribed.
// This is a workaround for the fact that cometbft ws_client does not have
// proper ping pong configuration setup to detect if the connection is dead.
// Refer to https://github.com/cometbft/cometbft/commit/2fd8496bc109d010c6c2e415604131b500550e37#r151452099
eventChan, err := s.bbn.Subscribe(
subscriberName,
newBlockQuery,
subscriptionHealthCheckInterval,
maxEventWaitInterval,
outCapacity,
)
if err != nil {
log.Fatal().Msgf("Failed to subscribe to events: %v", err)
}
Expand All @@ -36,11 +51,15 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if latestHeight == 0 {
log.Fatal().Msg("Event doesn't contain block height information")
}
log.Debug().
Int64("height", latestHeight).
Msg("received new block event from babylon subscription")

// Send the latest height to the BBN block processor
s.latestHeightChan <- latestHeight

case <-ctx.Done():
log.Info().Msg("context done, unsubscribing all babylon events")
err := s.bbn.UnsubscribeAll(subscriberName)
if err != nil {
log.Error().Msgf("Failed to unsubscribe from events: %v", err)
Expand Down

0 comments on commit 4106651

Please sign in to comment.