diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index d089485..1d03870 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strings" + "sync" + "time" "github.com/avast/retry-go/v4" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" @@ -16,6 +18,7 @@ import ( ) type BBNClient struct { + wg sync.WaitGroup queryClient *query.QueryClient cfg *config.BBNConfig } @@ -30,7 +33,10 @@ func NewBBNClient(cfg *config.BBNConfig) BbnInterface { if err != nil { log.Fatal().Err(err).Msg("error while creating BBN query client") } - return &BBNClient{queryClient, cfg} + return &BBNClient{ + queryClient: queryClient, + cfg: cfg, + } } func (c *BBNClient) GetLatestBlockNumber(ctx context.Context) (int64, error) { @@ -140,8 +146,86 @@ func (c *BBNClient) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.R return block, nil } -func (c *BBNClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { - return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...) +func (c *BBNClient) Subscribe( + subscriber, query string, + healthCheckInterval time.Duration, + maxEventWaitInterval time.Duration, + outCapacity ...int, +) (out <-chan ctypes.ResultEvent, err error) { + eventChan := make(chan ctypes.ResultEvent) + + subscribe := func() (<-chan ctypes.ResultEvent, error) { + newChan, err := c.queryClient.RPCClient.Subscribe( + context.Background(), + subscriber, + query, + outCapacity..., + ) + if err != nil { + return nil, fmt.Errorf( + "failed to subscribe babylon events for query %s: %w", query, err, + ) + } + return newChan, nil + } + + // Initial subscription + rawEventChan, err := subscribe() + if err != nil { + close(eventChan) + return nil, err + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + defer close(eventChan) + timeoutTicker := time.NewTicker(healthCheckInterval) + defer timeoutTicker.Stop() + lastEventTime := time.Now() + + for { + select { + case event, ok := <-rawEventChan: + if !ok { + log.Fatal(). + Str("subscriber", subscriber). + Str("query", query). + Msg("Subscription channel closed, this shall not happen") + } + lastEventTime = time.Now() + eventChan <- event + case <-timeoutTicker.C: + if time.Since(lastEventTime) > maxEventWaitInterval { + log.Error(). + Str("subscriber", subscriber). + Str("query", query). + Msg("No events received, attempting to resubscribe") + + if err := c.queryClient.RPCClient.Unsubscribe( + context.Background(), + subscriber, + query, + ); err != nil { + log.Error().Err(err).Msg("Failed to unsubscribe babylon events") + } + + // Create new subscription + newEventChan, err := subscribe() + if err != nil { + log.Error().Err(err).Msg("Failed to resubscribe babylon events") + continue + } + + // Replace the old channel with the new one + rawEventChan = newEventChan + // reset last event time + lastEventTime = time.Now() + } + } + } + }() + + return eventChan, nil } func (c *BBNClient) UnsubscribeAll(subscriber string) error { diff --git a/internal/clients/bbnclient/interface.go b/internal/clients/bbnclient/interface.go index c035dc1..249e077 100644 --- a/internal/clients/bbnclient/interface.go +++ b/internal/clients/bbnclient/interface.go @@ -2,6 +2,7 @@ package bbnclient import ( "context" + "time" ctypes "github.com/cometbft/cometbft/rpc/core/types" ) @@ -13,7 +14,12 @@ type BbnInterface interface { GetLatestBlockNumber(ctx context.Context) (int64, error) GetBlock(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlock, error) GetBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, error) - Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) + Subscribe( + subscriber, query string, + healthCheckInterval time.Duration, + maxEventWaitInterval time.Duration, + outCapacity ...int, + ) (out <-chan ctypes.ResultEvent, err error) UnsubscribeAll(subscriber string) error IsRunning() bool Start() error diff --git a/internal/services/subscription.go b/internal/services/subscription.go index 902b324..3263a8a 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -2,6 +2,7 @@ package services import ( "context" + "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" ctypes "github.com/cometbft/cometbft/types" @@ -9,16 +10,30 @@ import ( ) const ( - subscriberName = "babylon-staking-indexer" - newBlockQuery = "tm.event='NewBlock'" + subscriberName = "babylon-staking-indexer" + newBlockQuery = "tm.event='NewBlock'" + outCapacity = 100 + subscriptionHealthCheckInterval = 1 * time.Minute + maxEventWaitInterval = 1 * time.Minute ) func (s *Service) SubscribeToBbnEvents(ctx context.Context) { if !s.bbn.IsRunning() { log.Fatal().Msg("BBN client is not running") } - - eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery) + // Subscribe to new block events but only wait for 5 minutes for events + // if nothing come through within 5 minutes, the underlying subscription will + // be resubscribed. + // This is a workaround for the fact that cometbft ws_client does not have + // proper ping pong configuration setup to detect if the connection is dead. + // Refer to https://github.com/cometbft/cometbft/commit/2fd8496bc109d010c6c2e415604131b500550e37#r151452099 + eventChan, err := s.bbn.Subscribe( + subscriberName, + newBlockQuery, + subscriptionHealthCheckInterval, + maxEventWaitInterval, + outCapacity, + ) if err != nil { log.Fatal().Msgf("Failed to subscribe to events: %v", err) } @@ -36,11 +51,15 @@ func (s *Service) SubscribeToBbnEvents(ctx context.Context) { if latestHeight == 0 { log.Fatal().Msg("Event doesn't contain block height information") } + log.Debug(). + Int64("height", latestHeight). + Msg("received new block event from babylon subscription") // Send the latest height to the BBN block processor s.latestHeightChan <- latestHeight case <-ctx.Done(): + log.Info().Msg("context done, unsubscribing all babylon events") err := s.bbn.UnsubscribeAll(subscriberName) if err != nil { log.Error().Msgf("Failed to unsubscribe from events: %v", err)