diff --git a/cmd/staking-expiry-checker/main.go b/cmd/staking-expiry-checker/main.go index 62a6772..4ea4b74 100644 --- a/cmd/staking-expiry-checker/main.go +++ b/cmd/staking-expiry-checker/main.go @@ -60,14 +60,32 @@ func main() { log.Fatal().Err(err).Msg("error while creating btc notifier") } - delegationService := services.NewService(cfg, dbClient, btcNotifier, btcClient) + service := services.NewService(cfg, dbClient, btcNotifier, btcClient) if err != nil { - log.Fatal().Err(err).Msg("error while creating delegation service") + log.Fatal().Err(err).Msg("error while creating service") } - p, err := poller.NewPoller(cfg.Poller, delegationService) + // Create expiry poller + expiryPoller, err := poller.NewPoller( + poller.ExpiryPoller, + cfg.Pollers.ExpiryChecker, + service.ProcessExpiredDelegations, + ) + if err != nil { + log.Fatal().Err(err).Msg("error while creating expiry poller") + } + + // Create BTC subscriber poller + btcSubscriberPoller, err := poller.NewPoller( + poller.BTCSubscriberPoller, + cfg.Pollers.BtcSubscriber, + service.ProcessBTCSubscriber, + ) if err != nil { - log.Fatal().Err(err).Msg("error while creating poller") + log.Fatal().Err(err).Msg("error while creating BTC subscriber poller") } - p.Start(ctx) + + // Start pollers in separate goroutines + go expiryPoller.Start(ctx) + go btcSubscriberPoller.Start(ctx) } diff --git a/internal/config/config.go b/internal/config/config.go index f48bb9a..c5fb72a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,14 +9,14 @@ import ( ) type Config struct { - Poller PollerConfig `mapstructure:"poller"` + Pollers PollersConfig `mapstructure:"pollers"` Db DbConfig `mapstructure:"db"` Btc BtcConfig `mapstructure:"btc"` Metrics MetricsConfig `mapstructure:"metrics"` } func (cfg *Config) Validate() error { - if err := cfg.Poller.Validate(); err != nil { + if err := cfg.Pollers.Validate(); err != nil { return err } diff --git a/internal/config/poller.go b/internal/config/poller.go index eaad77b..3ec3b66 100644 --- a/internal/config/poller.go +++ b/internal/config/poller.go @@ -10,19 +10,36 @@ import ( type PollerConfig struct { Interval time.Duration `mapstructure:"interval"` - LogLevel string `mapstructure:"log-level"` Timeout time.Duration `mapstructure:"timeout"` } -func (cfg *PollerConfig) Validate() error { - if cfg.Interval < 0 { - return errors.New("poll interval cannot be negative") - } +type PollersConfig struct { + LogLevel string `mapstructure:"log-level"` + ExpiryChecker PollerConfig `mapstructure:"expiry-checker"` + BtcSubscriber PollerConfig `mapstructure:"btc-subscriber"` +} +func (cfg *PollersConfig) Validate() error { if err := cfg.ValidateServiceLogLevel(); err != nil { return err } + if err := cfg.ExpiryChecker.Validate(); err != nil { + return err + } + + if err := cfg.BtcSubscriber.Validate(); err != nil { + return err + } + + return nil +} + +func (cfg *PollerConfig) Validate() error { + if cfg.Interval <= 0 { + return errors.New("poll interval cannot be negative") + } + if cfg.Timeout <= 0 { return errors.New("poll timeout must be greater than 0") } @@ -30,7 +47,7 @@ func (cfg *PollerConfig) Validate() error { return nil } -func (cfg *PollerConfig) ValidateServiceLogLevel() error { +func (cfg *PollersConfig) ValidateServiceLogLevel() error { // If log level is not set, we don't need to validate it, a default value will be used in service if cfg.LogLevel == "" { return nil diff --git a/internal/poller/poller.go b/internal/poller/poller.go index fa5a38e..c54f0be 100644 --- a/internal/poller/poller.go +++ b/internal/poller/poller.go @@ -7,22 +7,33 @@ import ( "github.com/rs/zerolog/log" "github.com/babylonlabs-io/staking-expiry-checker/internal/config" - "github.com/babylonlabs-io/staking-expiry-checker/internal/services" + "github.com/babylonlabs-io/staking-expiry-checker/internal/types" ) +type PollerType string + +const ( + ExpiryPoller PollerType = "expiry" + BTCSubscriberPoller PollerType = "btc-subscriber" +) + +type PollerOperation func(ctx context.Context) *types.Error + type Poller struct { - service *services.Service - interval time.Duration - timeout time.Duration - quit chan struct{} + pollerType PollerType + operation PollerOperation + interval time.Duration + timeout time.Duration + quit chan struct{} } -func NewPoller(cfg config.PollerConfig, service *services.Service) (*Poller, error) { +func NewPoller(pollerType PollerType, cfg config.PollerConfig, operation PollerOperation) (*Poller, error) { return &Poller{ - service: service, - interval: cfg.Interval, - timeout: cfg.Timeout, - quit: make(chan struct{}), + pollerType: pollerType, + operation: operation, + interval: cfg.Interval, + timeout: cfg.Timeout, + quit: make(chan struct{}), }, nil } @@ -36,8 +47,11 @@ func (p *Poller) Start(ctx context.Context) { pollingCtx, cancel := context.WithTimeout(ctx, p.timeout) defer cancel() - if err := p.poll(pollingCtx); err != nil { - log.Error().Err(err).Msg("Error polling") + if err := p.operation(pollingCtx); err != nil { + log.Error(). + Err(err). + Str("poller", string(p.pollerType)). + Msg("Error in polling operation") } case <-ctx.Done(): // Handle context cancellation. @@ -54,12 +68,12 @@ func (p *Poller) Stop() { close(p.quit) } -func (p *Poller) poll(ctx context.Context) error { - log.Debug().Msg("Polling started") - if err := p.service.ProcessExpiredDelegations(ctx); err != nil { - log.Error().Err(err).Msg("Error processing expired delegations") - return err - } - log.Debug().Msg("Polling completed") - return nil -} +// func (p *Poller) poll(ctx context.Context) error { +// log.Debug().Msg("Polling started") +// if err := p.service.ProcessExpiredDelegations(ctx); err != nil { +// log.Error().Err(err).Msg("Error processing expired delegations") +// return err +// } +// log.Debug().Msg("Polling completed") +// return nil +// } diff --git a/internal/services/btc_subscriber.go b/internal/services/btc_subscriber.go new file mode 100644 index 0000000..beca25f --- /dev/null +++ b/internal/services/btc_subscriber.go @@ -0,0 +1,11 @@ +package services + +import ( + "context" + + "github.com/babylonlabs-io/staking-expiry-checker/internal/types" +) + +func (s *Service) ProcessBTCSubscriber(ctx context.Context) *types.Error { + return nil +} diff --git a/internal/services/expiry.go b/internal/services/expiry_checker.go similarity index 100% rename from internal/services/expiry.go rename to internal/services/expiry_checker.go diff --git a/internal/services/watch_btc_events.go b/internal/services/watch_btc_events.go index 7ca04fe..a5f93a8 100644 --- a/internal/services/watch_btc_events.go +++ b/internal/services/watch_btc_events.go @@ -568,7 +568,7 @@ func (s *Service) quitContext() (context.Context, func()) { return ctx, cancel } -func (s *Service) RegisterStakingSpendNotification( +func (s *Service) registerStakingSpendNotification( ctx context.Context, stakingTxHashHex string, stakingTxHex string,