From 91719c6af55931441c7aa9922ded7244bbbf4676 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 13:16:53 -0400 Subject: [PATCH 1/7] use mngr from queue client --- consumer/event_consumer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 consumer/event_consumer.go diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go new file mode 100644 index 0000000..00d6239 --- /dev/null +++ b/consumer/event_consumer.go @@ -0,0 +1,16 @@ +package consumer + +import ( + "github.com/babylonlabs-io/staking-queue-client/client" +) + +type EventConsumer interface { + Start() error + PushStakingEvent(ev *client.ActiveStakingEvent) error + PushUnbondingEvent(ev *client.UnbondingStakingEvent) error + PushWithdrawEvent(ev *client.WithdrawStakingEvent) error + PushExpiryEvent(ev *client.ExpiredStakingEvent) error + PushBtcInfoEvent(ev *client.BtcInfoEvent) error + PushConfirmedInfoEvent(ev *client.ConfirmedInfoEvent) error + Stop() error +} From 2ad3bc690205090c6ed443d03a86da4f6a65e2e8 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 13:17:42 -0400 Subject: [PATCH 2/7] push unbonding/expiry events --- cmd/babylon-staking-indexer/main.go | 9 +++--- internal/queue/queue.go | 40 +++++++++++++++++++++++++-- internal/services/delegation.go | 43 +++++++++++++++++++++++++++++ internal/services/expiry-checker.go | 9 ++++++ internal/services/service.go | 8 +++--- internal/utils/utils.go | 11 ++++++++ 6 files changed, 110 insertions(+), 10 deletions(-) diff --git a/cmd/babylon-staking-indexer/main.go b/cmd/babylon-staking-indexer/main.go index 253aa17..51f0bb0 100644 --- a/cmd/babylon-staking-indexer/main.go +++ b/cmd/babylon-staking-indexer/main.go @@ -4,8 +4,10 @@ import ( "context" "fmt" + "github.com/babylonlabs-io/staking-queue-client/queuemngr" "github.com/joho/godotenv" "github.com/rs/zerolog/log" + "go.uber.org/zap" "github.com/babylonlabs-io/babylon-staking-indexer/cmd/babylon-staking-indexer/cli" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" @@ -13,7 +15,6 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" "github.com/babylonlabs-io/babylon-staking-indexer/internal/services" ) @@ -50,12 +51,12 @@ func main() { } bbnClient := bbnclient.NewBbnClient(&cfg.Bbn) - qm, err := queue.NewQueueManager(&cfg.Queue) + queueConsumer, err := queuemngr.NewQueueManager(&cfg.Queue, &zap.Logger{}) if err != nil { - log.Fatal().Err(err).Msg("error while creating queue manager") + log.Fatal().Err(err).Msg("error while creating queue consumer") } - service := services.NewService(cfg, dbClient, btcClient, bbnClient, qm) + service := services.NewService(cfg, dbClient, btcClient, bbnClient, queueConsumer) if err != nil { log.Fatal().Err(err).Msg("error while creating delegation service") } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 9c117d0..4789e68 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -1,19 +1,55 @@ package queue import ( + "context" + "encoding/json" + "fmt" + + "github.com/babylonlabs-io/staking-queue-client/client" "github.com/rs/zerolog/log" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" queueConfig "github.com/babylonlabs-io/staking-queue-client/config" ) type QueueManager struct { + stakingExpiredEventQueue client.QueueClient } func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - return &QueueManager{}, nil + stakingEventQueue, err := client.NewQueueClient(cfg, client.ExpiredStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) + } + + return &QueueManager{ + stakingExpiredEventQueue: stakingEventQueue, + }, nil +} + +func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.ExpiredStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("publishing expired staking event") + err = qm.stakingExpiredEventQueue.SendMessage(ctx, messageBody) + if err != nil { + metrics.RecordQueueSendError() + log.Fatal().Err(err).Str("tx_hash", ev.StakingTxHashHex).Msg("failed to publish staking event") + } + log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("successfully published expired staking event") + + return nil } // Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. func (qm *QueueManager) Shutdown() { - log.Info().Msg("Shutting down queue manager") + err := qm.stakingExpiredEventQueue.Stop() + if err != nil { + log.Error().Err(err).Msg("failed to stop staking expired event queue") + } + } diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 6559fed..1c2cb21 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -2,14 +2,18 @@ package services import ( "context" + "encoding/hex" "fmt" "net/http" + "strconv" + "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" + queueclient "github.com/babylonlabs-io/staking-queue-client/client" abcitypes "github.com/cometbft/cometbft/abci/types" "github.com/rs/zerolog/log" ) @@ -136,9 +140,48 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( return nil } + // Fetch the current delegation state from the database + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, unbondedEarlyEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + // TODO: save timelock expire, need to figure out what will be the expire height in this case. // https://github.com/babylonlabs-io/babylon-staking-indexer/issues/28 + unbondingTime, _ := strconv.ParseUint(delegation.UnbondingTime, 10, 64) + unbondingTxBytes, err2 := hex.DecodeString(delegation.UnbondingTx) + if err2 != nil { + return types.NewInternalServiceError( + fmt.Errorf("failed to decode unbonding tx: %w", err2), + ) + } + unbondingTxHash, err3 := utils.GetTxHash(unbondingTxBytes) + if err3 != nil { + return types.NewInternalServiceError( + fmt.Errorf("failed to get unbonding tx hash: %w", err3), + ) + } + unbondingEvent := queueclient.NewUnbondingStakingEvent( + delegation.StakingTxHashHex, + uint64(delegation.StartHeight), + time.Now().Unix(), + unbondingTime, + // valid unbonding tx always has one output + 0, + delegation.UnbondingTx, + unbondingTxHash.String(), + ) + if err4 := s.consumer.PushUnbondingEvent(&unbondingEvent); err4 != nil { + return types.NewInternalServiceError( + fmt.Errorf("failed to send unbonding staking event: %w", err4), + ) + } + if dbErr := s.db.UpdateBTCDelegationState( ctx, unbondedEarlyEvent.StakingTxHash, types.StateUnbonding, ); dbErr != nil { diff --git a/internal/services/expiry-checker.go b/internal/services/expiry-checker.go index 46ccd1e..d284b9e 100644 --- a/internal/services/expiry-checker.go +++ b/internal/services/expiry-checker.go @@ -8,6 +8,7 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils/poller" + queueclient "github.com/babylonlabs-io/staking-queue-client/client" "github.com/rs/zerolog/log" ) @@ -53,6 +54,14 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } + ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, tlDoc.TxType) + if err := s.consumer.PushExpiryEvent(&ev); err != nil { + log.Error().Err(err).Msg("Error sending expired staking event") + return types.NewInternalServiceError( + fmt.Errorf("failed to send expired staking event: %w", err), + ) + } + if err := s.db.UpdateBTCDelegationState(ctx, delegation.StakingTxHashHex, types.StateWithdrawable); err != nil { log.Error().Err(err).Msg("Error updating BTC delegation state to withdrawable") return types.NewInternalServiceError( diff --git a/internal/services/service.go b/internal/services/service.go index 55866f8..59acff0 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -3,11 +3,11 @@ package services import ( "context" + "github.com/babylonlabs-io/babylon-staking-indexer/consumer" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" ) type Service struct { @@ -15,7 +15,7 @@ type Service struct { db db.DbInterface btc btcclient.BtcInterface bbn bbnclient.BbnInterface - queueManager *queue.QueueManager + consumer consumer.EventConsumer bbnEventProcessor chan BbnEvent } @@ -24,7 +24,7 @@ func NewService( db db.DbInterface, btc btcclient.BtcInterface, bbn bbnclient.BbnInterface, - qm *queue.QueueManager, + consumer consumer.EventConsumer, ) *Service { eventProcessor := make(chan BbnEvent, eventProcessorSize) return &Service{ @@ -32,7 +32,7 @@ func NewService( db: db, btc: btc, bbn: bbn, - queueManager: qm, + consumer: consumer, bbnEventProcessor: eventProcessor, } } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 253bcdf..5e0a451 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,11 +1,14 @@ package utils import ( + "bytes" "runtime" "strconv" "strings" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" ) type SupportedBtcNetwork string @@ -99,3 +102,11 @@ func Contains[T comparable](slice []T, item T) bool { } return false } + +func GetTxHash(unbondingTxBytes []byte) (chainhash.Hash, error) { + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(unbondingTxBytes)); err != nil { + return chainhash.Hash{}, err + } + return msgTx.TxHash(), nil +} From ac61f58dc9ce406ff9ff0c7f3b45da85b5ad9bfb Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 14:58:12 -0400 Subject: [PATCH 3/7] emit events for api --- cmd/babylon-staking-indexer/main.go | 9 +- consumer/event_consumer.go | 16 -- go.mod | 3 +- go.sum | 3 + internal/queue/client/client.go | 37 +++ internal/queue/client/rabbitmq_client.go | 279 +++++++++++++++++++ internal/queue/client/schema.go | 330 +++++++++++++++++++++++ internal/queue/queue.go | 102 ++++++- internal/services/delegation.go | 78 +++++- internal/services/expiry-checker.go | 4 +- internal/services/service.go | 8 +- 11 files changed, 835 insertions(+), 34 deletions(-) delete mode 100644 consumer/event_consumer.go create mode 100644 internal/queue/client/client.go create mode 100644 internal/queue/client/rabbitmq_client.go create mode 100644 internal/queue/client/schema.go diff --git a/cmd/babylon-staking-indexer/main.go b/cmd/babylon-staking-indexer/main.go index 51f0bb0..253aa17 100644 --- a/cmd/babylon-staking-indexer/main.go +++ b/cmd/babylon-staking-indexer/main.go @@ -4,10 +4,8 @@ import ( "context" "fmt" - "github.com/babylonlabs-io/staking-queue-client/queuemngr" "github.com/joho/godotenv" "github.com/rs/zerolog/log" - "go.uber.org/zap" "github.com/babylonlabs-io/babylon-staking-indexer/cmd/babylon-staking-indexer/cli" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" @@ -15,6 +13,7 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" "github.com/babylonlabs-io/babylon-staking-indexer/internal/services" ) @@ -51,12 +50,12 @@ func main() { } bbnClient := bbnclient.NewBbnClient(&cfg.Bbn) - queueConsumer, err := queuemngr.NewQueueManager(&cfg.Queue, &zap.Logger{}) + qm, err := queue.NewQueueManager(&cfg.Queue) if err != nil { - log.Fatal().Err(err).Msg("error while creating queue consumer") + log.Fatal().Err(err).Msg("error while creating queue manager") } - service := services.NewService(cfg, dbClient, btcClient, bbnClient, queueConsumer) + service := services.NewService(cfg, dbClient, btcClient, bbnClient, qm) if err != nil { log.Fatal().Err(err).Msg("error while creating delegation service") } diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go deleted file mode 100644 index 00d6239..0000000 --- a/consumer/event_consumer.go +++ /dev/null @@ -1,16 +0,0 @@ -package consumer - -import ( - "github.com/babylonlabs-io/staking-queue-client/client" -) - -type EventConsumer interface { - Start() error - PushStakingEvent(ev *client.ActiveStakingEvent) error - PushUnbondingEvent(ev *client.UnbondingStakingEvent) error - PushWithdrawEvent(ev *client.WithdrawStakingEvent) error - PushExpiryEvent(ev *client.ExpiredStakingEvent) error - PushBtcInfoEvent(ev *client.BtcInfoEvent) error - PushConfirmedInfoEvent(ev *client.ConfirmedInfoEvent) error - Stop() error -} diff --git a/go.mod b/go.mod index 75de9ac..a1175e6 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,12 @@ require ( github.com/babylonlabs-io/babylon v0.12.1 github.com/babylonlabs-io/staking-queue-client v0.4.1 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 + github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/cometbft/cometbft v0.38.7 github.com/cosmos/cosmos-sdk v0.50.6 github.com/cosmos/gogoproto v1.7.0 github.com/go-chi/chi/v5 v5.1.0 + github.com/rabbitmq/amqp091-go v1.9.0 github.com/spf13/viper v1.19.0 ) @@ -44,7 +46,6 @@ require ( github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/btcutil v1.1.6 // indirect - github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect diff --git a/go.sum b/go.sum index 3d333d2..15a4af0 100644 --- a/go.sum +++ b/go.sum @@ -1023,6 +1023,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.3.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1199,6 +1201,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= diff --git a/internal/queue/client/client.go b/internal/queue/client/client.go new file mode 100644 index 0000000..1c0f92b --- /dev/null +++ b/internal/queue/client/client.go @@ -0,0 +1,37 @@ +package client + +import ( + "context" + + "github.com/babylonlabs-io/staking-queue-client/config" +) + +type QueueMessage struct { + Body string + Receipt string + RetryAttempts int32 +} + +func (m QueueMessage) IncrementRetryAttempts() int32 { + m.RetryAttempts++ + return m.RetryAttempts +} + +func (m QueueMessage) GetRetryAttempts() int32 { + return m.RetryAttempts +} + +// A common interface for queue clients regardless if it's a SQS, RabbitMQ, etc. +type QueueClient interface { + SendMessage(ctx context.Context, messageBody string) error + ReceiveMessages() (<-chan QueueMessage, error) + DeleteMessage(receipt string) error + Stop() error + GetQueueName() string + ReQueueMessage(ctx context.Context, message QueueMessage) error + Ping(ctx context.Context) error +} + +func NewQueueClient(config *config.QueueConfig, queueName string) (QueueClient, error) { + return NewRabbitMqClient(config, queueName) +} diff --git a/internal/queue/client/rabbitmq_client.go b/internal/queue/client/rabbitmq_client.go new file mode 100644 index 0000000..975921f --- /dev/null +++ b/internal/queue/client/rabbitmq_client.go @@ -0,0 +1,279 @@ +package client + +import ( + "context" + "fmt" + "strconv" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/babylonlabs-io/staking-queue-client/config" +) + +const ( + dlxName = "common_dlx" + dlxRoutingPostfix = "_routing_key" + delayedQueuePostfix = "_delay" +) + +type RabbitMqClient struct { + connection *amqp.Connection + channel *amqp.Channel + queueName string + stopCh chan struct{} // This is used to gracefully stop the message receiving loop + delayedRequeueTime time.Duration +} + +func NewRabbitMqClient(config *config.QueueConfig, queueName string) (*RabbitMqClient, error) { + amqpURI := fmt.Sprintf("amqp://%s:%s@%s", config.QueueUser, config.QueuePassword, config.Url) + + conn, err := amqp.Dial(amqpURI) + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + + // Declare a single common DLX for all queues + err = ch.ExchangeDeclare(dlxName, + "direct", + true, + false, + false, + false, + amqp.Table{ + "x-queue-type": config.QueueType, + }, + ) + if err != nil { + return nil, err + } + + // Declare a delay queue specific to this particular queue + delayQueueName := queueName + delayedQueuePostfix + _, err = ch.QueueDeclare( + delayQueueName, + true, + false, + false, + false, + amqp.Table{ + // Default exchange to route messages back to the main queue + // The "" in rabbitMq referring to the default exchange which allows + // to route messages to the queue by the routing key which is the queue name + "x-queue-type": config.QueueType, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": queueName, + }, + ) + if err != nil { + return nil, err + } + + // Declare the queue that will be created if not exists + customDlxRoutingKey := queueName + dlxRoutingPostfix + _, err = ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp.Table{ + "x-queue-type": config.QueueType, + "x-dead-letter-exchange": dlxName, + "x-dead-letter-routing-key": customDlxRoutingKey, + }, + ) + if err != nil { + return nil, err + } + + // Bind the delay queue to the common DLX + err = ch.QueueBind(delayQueueName, customDlxRoutingKey, dlxName, false, nil) + if err != nil { + return nil, err + } + + err = ch.Confirm(false) + if err != nil { + return nil, err + } + + return &RabbitMqClient{ + connection: conn, + channel: ch, + queueName: queueName, + stopCh: make(chan struct{}), + delayedRequeueTime: time.Duration(config.ReQueueDelayTime) * time.Second, + }, nil +} + +// Ping checks the health of the RabbitMQ infrastructure. +func (c *RabbitMqClient) Ping(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Check if the RabbitMQ connection is closed + if c.connection.IsClosed() { + return fmt.Errorf("rabbitMQ connection is closed") + } + + // Check if the RabbitMQ channel is closed + if c.channel.IsClosed() { + return fmt.Errorf("rabbitMQ channel is closed") + } + } + + return nil +} + +func (c *RabbitMqClient) ReceiveMessages() (<-chan QueueMessage, error) { + msgs, err := c.channel.Consume( + c.queueName, // queueName + "", // consumer + false, // auto-ack. We want to manually acknowledge the message after processing it. + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, err + } + output := make(chan QueueMessage) + go func() { + defer close(output) + for { + select { + case d, ok := <-msgs: + if !ok { + return // Channel closed, exit goroutine + } + attempts := d.Headers["x-processing-attempts"] + if attempts == nil { + attempts = int32(0) + } + currentAttempt := attempts.(int32) + + output <- QueueMessage{ + Body: string(d.Body), + Receipt: strconv.FormatUint(d.DeliveryTag, 10), + RetryAttempts: currentAttempt, + } + case <-c.stopCh: + return // Stop signal received, exit goroutine + } + } + }() + + return output, nil +} + +// DeleteMessage deletes a message from the queue. In RabbitMQ, this is equivalent to acknowledging the message. +// The deliveryTag is the unique identifier for the message. +func (c *RabbitMqClient) DeleteMessage(deliveryTag string) error { + deliveryTagInt, err := strconv.ParseUint(deliveryTag, 10, 64) + if err != nil { + return err + } + return c.channel.Ack(deliveryTagInt, false) +} + +// ReQueueMessage requeues a message back to the queue with a delay. +// This is done by sending the message again with an incremented counter. +// The original message is then deleted from the queue. +func (c *RabbitMqClient) ReQueueMessage(ctx context.Context, message QueueMessage) error { + // For requeueing, we will send the message to a delay queue that has a TTL pre-configured. + delayQueueName := c.queueName + delayedQueuePostfix + err := c.sendMessageWithAttempts(ctx, message.Body, delayQueueName, message.IncrementRetryAttempts(), c.delayedRequeueTime) + if err != nil { + return fmt.Errorf("failed to requeue message: %w", err) + } + + err = c.DeleteMessage(message.Receipt) + if err != nil { + return fmt.Errorf("failed to delete message while requeuing: %w", err) + } + + return nil +} + +// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. +func (c *RabbitMqClient) sendMessageWithAttempts(ctx context.Context, messageBody, queueName string, attempts int32, ttl time.Duration) error { + // Ensure the channel is open + if c.channel == nil { + return fmt.Errorf("RabbitMQ channel not initialized") + } + + // Prepare new headers with the incremented counter + newHeaders := amqp.Table{ + "x-processing-attempts": attempts, + } + + publishMsg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + Headers: newHeaders, + } + + // Exclude the expiration if the TTL is 0. + if ttl > 0 { + publishMsg.Expiration = strconv.Itoa(int(ttl.Milliseconds())) + } + + // Publish a message to the queue + confirmation, err := c.channel.PublishWithDeferredConfirmWithContext( + ctx, + "", // exchange: Use the default exchange + queueName, // routing key: The queue this message should be routed to + true, // mandatory: true indicates the server must route the message to a queue, otherwise error + false, // immediate: false indicates the server may wait to send the message until a consumer is available + publishMsg, + ) + + if err != nil { + return fmt.Errorf("failed to publish a message to queue %s: %w", queueName, err) + } + + if confirmation == nil { + return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) + } + confirmed, err := confirmation.WaitContext(ctx) + if err != nil { + return fmt.Errorf("failed to confirm message when publishing into queue %s: %w", queueName, err) + } + if !confirmed { + return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) + } + + return nil +} + +// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. +func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) error { + return c.sendMessageWithAttempts(ctx, messageBody, c.queueName, 0, 0) +} + +// Stop stops the message receiving process. +func (c *RabbitMqClient) Stop() error { + if err := c.channel.Close(); err != nil { + return err + } + if err := c.connection.Close(); err != nil { + return err + } + + close(c.stopCh) + + return nil +} + +func (c *RabbitMqClient) GetQueueName() string { + return c.queueName +} diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go new file mode 100644 index 0000000..1af41c7 --- /dev/null +++ b/internal/queue/client/schema.go @@ -0,0 +1,330 @@ +package client + +const ( + ActiveStakingQueueName string = "active_staking_queue" + UnbondingStakingQueueName string = "unbonding_staking_queue" + WithdrawStakingQueueName string = "withdraw_staking_queue" + ExpiredStakingQueueName string = "expired_staking_queue" + StakingStatsQueueName string = "staking_stats_queue" + BtcInfoQueueName string = "btc_info_queue" + ConfirmedInfoQueueName string = "confirmed_info_queue" + VerifiedStakingQueueName string = "verified_staking_queue" + PendingStakingQueueName string = "pending_staking_queue" +) + +const ( + ActiveStakingEventType EventType = 1 + UnbondingStakingEventType EventType = 2 + WithdrawStakingEventType EventType = 3 + ExpiredStakingEventType EventType = 4 + StatsEventType EventType = 5 + BtcInfoEventType EventType = 6 + ConfirmedInfoEventType EventType = 7 + VerifiedStakingEventType EventType = 8 + PendingStakingEventType EventType = 9 +) + +// Event schema versions, only increment when the schema changes +const ( + ActiveEventVersion int = 0 + UnbondingEventVersion int = 0 + WithdrawEventVersion int = 1 + ExpiredEventVersion int = 0 + StatsEventVersion int = 1 + BtcInfoEventVersion int = 0 + ConfirmedInfoEventVersion int = 0 + VerifiedEventVersion int = 0 + PendingEventVersion int = 0 +) + +type EventType int + +type EventMessage interface { + GetEventType() EventType + GetStakingTxHashHex() string +} + +type ActiveStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingValue uint64 `json:"staking_value"` + StakingStartHeight uint64 `json:"staking_start_height"` + StakingStartTimestamp int64 `json:"staking_start_timestamp"` + StakingTimeLock uint64 `json:"staking_timelock"` + StakingOutputIndex uint64 `json:"staking_output_index"` + StakingTxHex string `json:"staking_tx_hex"` + IsOverflow bool `json:"is_overflow"` +} + +func (e ActiveStakingEvent) GetEventType() EventType { + return ActiveStakingEventType +} + +func (e ActiveStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewActiveStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingValue uint64, + stakingStartHeight uint64, + stakingStartTimestamp int64, + stakingTimeLock uint64, + stakingOutputIndex uint64, + stakingTxHex string, + isOverflow bool, +) ActiveStakingEvent { + return ActiveStakingEvent{ + SchemaVersion: ActiveEventVersion, + EventType: ActiveStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingValue: stakingValue, + StakingStartHeight: stakingStartHeight, + StakingStartTimestamp: stakingStartTimestamp, + StakingTimeLock: stakingTimeLock, + StakingOutputIndex: stakingOutputIndex, + StakingTxHex: stakingTxHex, + IsOverflow: isOverflow, + } +} + +type UnbondingStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + UnbondingStartHeight uint64 `json:"unbonding_start_height"` + UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` + UnbondingTimeLock uint64 `json:"unbonding_timelock"` + UnbondingOutputIndex uint64 `json:"unbonding_output_index"` + UnbondingTxHex string `json:"unbonding_tx_hex"` + UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` +} + +func (e UnbondingStakingEvent) GetEventType() EventType { + return UnbondingStakingEventType +} + +func (e UnbondingStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewUnbondingStakingEvent( + stakingTxHashHex string, + unbondingStartHeight uint64, + unbondingStartTimestamp int64, + unbondingTimeLock uint64, + unbondingOutputIndex uint64, + unbondingTxHex string, + unbondingTxHashHex string, +) UnbondingStakingEvent { + return UnbondingStakingEvent{ + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + UnbondingStartHeight: unbondingStartHeight, + UnbondingStartTimestamp: unbondingStartTimestamp, + UnbondingTimeLock: unbondingTimeLock, + UnbondingOutputIndex: unbondingOutputIndex, + UnbondingTxHex: unbondingTxHex, + UnbondingTxHashHex: unbondingTxHashHex, + } +} + +type WithdrawStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + WithdrawTxHashHex string `json:"withdraw_tx_hash_hex"` + WithdrawTxBtcHeight uint64 `json:"withdraw_tx_btc_height"` + WithdrawTxHex string `json:"withdraw_tx_hex"` +} + +func (e WithdrawStakingEvent) GetEventType() EventType { + return WithdrawStakingEventType +} + +func (e WithdrawStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewWithdrawStakingEvent( + stakingTxHashHex string, + withdrawTxHashHex string, + withdrawTxBtcHeight uint64, + withdrawTxHex string, +) WithdrawStakingEvent { + return WithdrawStakingEvent{ + SchemaVersion: WithdrawEventVersion, + EventType: WithdrawStakingEventType, + StakingTxHashHex: stakingTxHashHex, + WithdrawTxHashHex: withdrawTxHashHex, + WithdrawTxBtcHeight: withdrawTxBtcHeight, + WithdrawTxHex: withdrawTxHex, + } +} + +type ExpiredStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + TxType string `json:"tx_type"` +} + +func (e ExpiredStakingEvent) GetEventType() EventType { + return ExpiredStakingEventType +} + +func (e ExpiredStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { + return ExpiredStakingEvent{ + SchemaVersion: ExpiredEventVersion, + EventType: ExpiredStakingEventType, + StakingTxHashHex: stakingTxHashHex, + TxType: txType, + } +} + +type StatsEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 5. StatsEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerPkHex string `json:"staker_pk_hex"` + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + StakingValue uint64 `json:"staking_value"` + State string `json:"state"` + IsOverflow bool `json:"is_overflow"` +} + +func (e StatsEvent) GetEventType() EventType { + return StatsEventType +} + +func (e StatsEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewStatsEvent( + stakingTxHashHex string, + stakerPkHex string, + finalityProviderPkHex string, + stakingValue uint64, + state string, + isOverflow bool, +) StatsEvent { + return StatsEvent{ + SchemaVersion: StatsEventVersion, + EventType: StatsEventType, + StakingTxHashHex: stakingTxHashHex, + StakerPkHex: stakerPkHex, + FinalityProviderPkHex: finalityProviderPkHex, + StakingValue: stakingValue, + State: state, + IsOverflow: isOverflow, + } +} + +type BtcInfoEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 6. BtcInfoEventType + Height uint64 `json:"height"` + ConfirmedTvl uint64 `json:"confirmed_tvl"` + UnconfirmedTvl uint64 `json:"unconfirmed_tvl"` +} + +func (e BtcInfoEvent) GetEventType() EventType { + return BtcInfoEventType +} + +// Not applicable, add it here to implement the EventMessage interface +func (e BtcInfoEvent) GetStakingTxHashHex() string { + return "" +} + +func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { + return BtcInfoEvent{ + SchemaVersion: BtcInfoEventVersion, + EventType: BtcInfoEventType, + Height: height, + ConfirmedTvl: confirmedTvl, + UnconfirmedTvl: unconfirmedTvl, + } +} + +type ConfirmedInfoEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType + Height uint64 `json:"height"` + Tvl uint64 `json:"tvl"` +} + +func (e ConfirmedInfoEvent) GetEventType() EventType { + return ConfirmedInfoEventType +} + +// Not applicable, add it here to implement the EventMessage interface +func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { + return "" +} + +func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { + return ConfirmedInfoEvent{ + SchemaVersion: ConfirmedInfoEventVersion, + EventType: ConfirmedInfoEventType, + Height: height, + Tvl: tvl, + } +} + +type VerifiedStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 8. VerifiedStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` +} + +func (e VerifiedStakingEvent) GetEventType() EventType { + return VerifiedStakingEventType +} + +func (e VerifiedStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewVerifiedStakingEvent(stakingTxHashHex string) VerifiedStakingEvent { + return VerifiedStakingEvent{ + SchemaVersion: VerifiedEventVersion, + EventType: VerifiedStakingEventType, + StakingTxHashHex: stakingTxHashHex, + } +} + +type PendingStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 9. PendingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` +} + +func (e PendingStakingEvent) GetEventType() EventType { + return PendingStakingEventType +} + +func (e PendingStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewPendingStakingEvent(stakingTxHashHex string) PendingStakingEvent { + return PendingStakingEvent{ + SchemaVersion: PendingEventVersion, + EventType: PendingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + } +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 4789e68..c6de30e 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" - "github.com/babylonlabs-io/staking-queue-client/client" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/rs/zerolog/log" "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" @@ -13,7 +13,11 @@ import ( ) type QueueManager struct { - stakingExpiredEventQueue client.QueueClient + stakingExpiredEventQueue client.QueueClient + unbondingEventQueue client.QueueClient + activeStakingEventQueue client.QueueClient + verifiedStakingEventQueue client.QueueClient + pendingStakingEventQueue client.QueueClient } func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { @@ -22,8 +26,32 @@ func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) } + unbondingEventQueue, err := client.NewQueueClient(cfg, client.UnbondingStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize unbonding event queue: %w", err) + } + + activeStakingEventQueue, err := client.NewQueueClient(cfg, client.ActiveStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize active staking event queue: %w", err) + } + + verifiedStakingEventQueue, err := client.NewQueueClient(cfg, client.VerifiedStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize verified staking event queue: %w", err) + } + + pendingStakingEventQueue, err := client.NewQueueClient(cfg, client.PendingStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize pending staking event queue: %w", err) + } + return &QueueManager{ - stakingExpiredEventQueue: stakingEventQueue, + stakingExpiredEventQueue: stakingEventQueue, + unbondingEventQueue: unbondingEventQueue, + activeStakingEventQueue: activeStakingEventQueue, + verifiedStakingEventQueue: verifiedStakingEventQueue, + pendingStakingEventQueue: pendingStakingEventQueue, }, nil } @@ -45,6 +73,74 @@ func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.E return nil } +func (qm *QueueManager) SendUnbondingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("pushing unbonding event") + err = qm.unbondingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push unbonding event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("successfully pushed unbonding event") + + return nil +} + +func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.ActiveStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing active staking event") + err = qm.activeStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push active staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed active staking event") + + return nil +} + +func (qm *QueueManager) SendVerifiedStakingEvent(ctx context.Context, ev *client.VerifiedStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing verified staking event") + err = qm.verifiedStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push verified staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed verified staking event") + + return nil +} + +func (qm *QueueManager) SendPendingStakingEvent(ctx context.Context, ev *client.PendingStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing pending staking event") + err = qm.pendingStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push pending staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed pending staking event") + + return nil +} + // Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. func (qm *QueueManager) Shutdown() { err := qm.stakingExpiredEventQueue.Stop() diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 1c2cb21..d511345 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -10,10 +10,10 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" - queueclient "github.com/babylonlabs-io/staking-queue-client/client" abcitypes "github.com/cometbft/cometbft/abci/types" "github.com/rs/zerolog/log" ) @@ -40,6 +40,11 @@ func (s *Service) processNewBTCDelegationEvent( return validationErr } + ev := queueclient.NewPendingStakingEvent(newDelegation.StakingTxHash) + if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) + } + if dbErr := s.db.SaveNewBTCDelegation( ctx, model.FromEventBTCDelegationCreated(newDelegation), ); dbErr != nil { @@ -76,8 +81,44 @@ func (s *Service) processCovenantQuorumReachedEvent( return nil } + // Fetch the current delegation state from the database + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + newState := types.DelegationState(covenantQuorumReachedEvent.NewState) + if newState == types.StateActive { + stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) + stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) + ev := queueclient.NewActiveStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + stakingAmount, + uint64(delegation.StartHeight), + time.Now().Unix(), + stakingTime, + 0, + "", + false, + ) + if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) + } + } else if newState == types.StateVerified { + ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) + if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) + } + } + if dbErr := s.db.UpdateBTCDelegationState( - ctx, covenantQuorumReachedEvent.StakingTxHash, types.DelegationState(covenantQuorumReachedEvent.NewState), + ctx, covenantQuorumReachedEvent.StakingTxHash, newState, ); dbErr != nil { return types.NewError( http.StatusInternalServerError, @@ -108,6 +149,37 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( return nil } + // Fetch the current delegation state from the database + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + newState := types.DelegationState(inclusionProofEvent.NewState) + if newState == types.StateActive { + stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) + stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) + ev := queueclient.NewActiveStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + stakingAmount, + uint64(delegation.StartHeight), + time.Now().Unix(), + stakingTime, + 0, + "", + false, + ) + if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) + } + } + if dbErr := s.db.UpdateBTCDelegationDetails( ctx, inclusionProofEvent.StakingTxHash, model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent), ); dbErr != nil { @@ -176,7 +248,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( delegation.UnbondingTx, unbondingTxHash.String(), ) - if err4 := s.consumer.PushUnbondingEvent(&unbondingEvent); err4 != nil { + if err4 := s.queueManager.SendUnbondingEvent(ctx, &unbondingEvent); err4 != nil { return types.NewInternalServiceError( fmt.Errorf("failed to send unbonding staking event: %w", err4), ) diff --git a/internal/services/expiry-checker.go b/internal/services/expiry-checker.go index d284b9e..66df968 100644 --- a/internal/services/expiry-checker.go +++ b/internal/services/expiry-checker.go @@ -5,10 +5,10 @@ import ( "fmt" "net/http" + queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils/poller" - queueclient "github.com/babylonlabs-io/staking-queue-client/client" "github.com/rs/zerolog/log" ) @@ -55,7 +55,7 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { } ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, tlDoc.TxType) - if err := s.consumer.PushExpiryEvent(&ev); err != nil { + if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { log.Error().Err(err).Msg("Error sending expired staking event") return types.NewInternalServiceError( fmt.Errorf("failed to send expired staking event: %w", err), diff --git a/internal/services/service.go b/internal/services/service.go index 59acff0..55866f8 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -3,11 +3,11 @@ package services import ( "context" - "github.com/babylonlabs-io/babylon-staking-indexer/consumer" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" ) type Service struct { @@ -15,7 +15,7 @@ type Service struct { db db.DbInterface btc btcclient.BtcInterface bbn bbnclient.BbnInterface - consumer consumer.EventConsumer + queueManager *queue.QueueManager bbnEventProcessor chan BbnEvent } @@ -24,7 +24,7 @@ func NewService( db db.DbInterface, btc btcclient.BtcInterface, bbn bbnclient.BbnInterface, - consumer consumer.EventConsumer, + qm *queue.QueueManager, ) *Service { eventProcessor := make(chan BbnEvent, eventProcessorSize) return &Service{ @@ -32,7 +32,7 @@ func NewService( db: db, btc: btc, bbn: bbn, - consumer: consumer, + queueManager: qm, bbnEventProcessor: eventProcessor, } } From 5ec7f769e24ea75da39f4cc707b3974b22fc0449 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 15:44:11 -0400 Subject: [PATCH 4/7] emit events refactor --- internal/queue/queue.go | 2 +- internal/services/consumer_events.go | 97 +++++++++++++++++++++++++++ internal/services/delegation.go | 99 ++++------------------------ internal/services/expiry-checker.go | 9 +-- 4 files changed, 113 insertions(+), 94 deletions(-) create mode 100644 internal/services/consumer_events.go diff --git a/internal/queue/queue.go b/internal/queue/queue.go index c6de30e..62cf0a8 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -73,7 +73,7 @@ func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.E return nil } -func (qm *QueueManager) SendUnbondingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { +func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go new file mode 100644 index 0000000..1fc3620 --- /dev/null +++ b/internal/services/consumer_events.go @@ -0,0 +1,97 @@ +package services + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + "github.com/rs/zerolog/log" +) + +func (s *Service) emitConsumerEvent( + ctx context.Context, newState types.DelegationState, delegation *model.BTCDelegationDetails, +) *types.Error { + switch newState { + case types.StateActive: + return s.sendActiveDelegationEvent(ctx, delegation) + case types.StateVerified: + return s.sendVerifiedDelegationEvent(ctx, delegation) + case types.StatePending: + return s.sendPendingDelegationEvent(ctx, delegation) + case types.StateUnbonding: + return s.sendUnbondingDelegationEvent(ctx, delegation) + case types.StateWithdrawable: + return s.sendWithdrawableDelegationEvent(ctx, delegation) + default: + return nil + } +} + +func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) + stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) + ev := queueclient.NewActiveStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + stakingAmount, + uint64(delegation.StartHeight), + time.Now().Unix(), + stakingTime, + 0, + "", + false, + ) + if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) + } + return nil +} + +func (s *Service) sendVerifiedDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) + if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) + } + return nil +} + +func (s *Service) sendPendingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewPendingStakingEvent(delegation.StakingTxHashHex) + if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) + } + return nil +} + +func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewUnbondingStakingEvent( + delegation.StakingTxHashHex, + uint64(delegation.EndHeight), + time.Now().Unix(), + uint64(delegation.StartHeight), + uint64(delegation.EndHeight), + delegation.UnbondingTx, + delegation.UnbondingTime, + ) + if err := s.queueManager.SendUnbondingStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send unbonding staking event: %w", err)) + } + return nil +} + +func (s *Service) sendWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, "") // TODO: add the correct tx type + if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { + log.Error().Err(err).Msg("Error sending expired staking event") + return types.NewInternalServiceError( + fmt.Errorf("failed to send expired staking event: %w", err), + ) + } + + return nil +} diff --git a/internal/services/delegation.go b/internal/services/delegation.go index d511345..2651f9b 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -2,15 +2,11 @@ package services import ( "context" - "encoding/hex" "fmt" "net/http" - "strconv" - "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" - queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" @@ -40,13 +36,11 @@ func (s *Service) processNewBTCDelegationEvent( return validationErr } - ev := queueclient.NewPendingStakingEvent(newDelegation.StakingTxHash) - if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) - } + delegationDoc := model.FromEventBTCDelegationCreated(newDelegation) + s.emitConsumerEvent(ctx, types.StatePending, delegationDoc) if dbErr := s.db.SaveNewBTCDelegation( - ctx, model.FromEventBTCDelegationCreated(newDelegation), + ctx, delegationDoc, ); dbErr != nil { if db.IsDuplicateKeyError(dbErr) { // BTC delegation already exists, ignore the event @@ -81,7 +75,6 @@ func (s *Service) processCovenantQuorumReachedEvent( return nil } - // Fetch the current delegation state from the database delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash) if dbErr != nil { return types.NewError( @@ -90,31 +83,10 @@ func (s *Service) processCovenantQuorumReachedEvent( fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), ) } - newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - if newState == types.StateActive { - stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) - stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) - ev := queueclient.NewActiveStakingEvent( - delegation.StakingTxHashHex, - delegation.StakerBtcPkHex, - delegation.FinalityProviderBtcPksHex, - stakingAmount, - uint64(delegation.StartHeight), - time.Now().Unix(), - stakingTime, - 0, - "", - false, - ) - if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) - } - } else if newState == types.StateVerified { - ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) - if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) - } + err = s.emitConsumerEvent(ctx, newState, delegation) + if err != nil { + return err } if dbErr := s.db.UpdateBTCDelegationState( @@ -149,7 +121,6 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( return nil } - // Fetch the current delegation state from the database delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash) if dbErr != nil { return types.NewError( @@ -158,26 +129,11 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), ) } - newState := types.DelegationState(inclusionProofEvent.NewState) if newState == types.StateActive { - stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) - stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) - ev := queueclient.NewActiveStakingEvent( - delegation.StakingTxHashHex, - delegation.StakerBtcPkHex, - delegation.FinalityProviderBtcPksHex, - stakingAmount, - uint64(delegation.StartHeight), - time.Now().Unix(), - stakingTime, - 0, - "", - false, - ) - if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) - } + // emit the consumer event only if the new state is ACTIVE + // we do not need to emit the PENDING event because it was already emitted in the processNewBTCDelegationEvent + s.emitConsumerEvent(ctx, types.StateActive, delegation) } if dbErr := s.db.UpdateBTCDelegationDetails( @@ -212,7 +168,9 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( return nil } - // Fetch the current delegation state from the database + // TODO: save timelock expire, need to figure out what will be the expire height in this case. + // https://github.com/babylonlabs-io/babylon-staking-indexer/issues/28 + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, unbondedEarlyEvent.StakingTxHash) if dbErr != nil { return types.NewError( @@ -221,38 +179,7 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), ) } - - // TODO: save timelock expire, need to figure out what will be the expire height in this case. - // https://github.com/babylonlabs-io/babylon-staking-indexer/issues/28 - - unbondingTime, _ := strconv.ParseUint(delegation.UnbondingTime, 10, 64) - unbondingTxBytes, err2 := hex.DecodeString(delegation.UnbondingTx) - if err2 != nil { - return types.NewInternalServiceError( - fmt.Errorf("failed to decode unbonding tx: %w", err2), - ) - } - unbondingTxHash, err3 := utils.GetTxHash(unbondingTxBytes) - if err3 != nil { - return types.NewInternalServiceError( - fmt.Errorf("failed to get unbonding tx hash: %w", err3), - ) - } - unbondingEvent := queueclient.NewUnbondingStakingEvent( - delegation.StakingTxHashHex, - uint64(delegation.StartHeight), - time.Now().Unix(), - unbondingTime, - // valid unbonding tx always has one output - 0, - delegation.UnbondingTx, - unbondingTxHash.String(), - ) - if err4 := s.queueManager.SendUnbondingEvent(ctx, &unbondingEvent); err4 != nil { - return types.NewInternalServiceError( - fmt.Errorf("failed to send unbonding staking event: %w", err4), - ) - } + s.emitConsumerEvent(ctx, types.StateUnbonding, delegation) if dbErr := s.db.UpdateBTCDelegationState( ctx, unbondedEarlyEvent.StakingTxHash, types.StateUnbonding, diff --git a/internal/services/expiry-checker.go b/internal/services/expiry-checker.go index 66df968..1942956 100644 --- a/internal/services/expiry-checker.go +++ b/internal/services/expiry-checker.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils/poller" @@ -54,12 +53,8 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } - ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, tlDoc.TxType) - if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { - log.Error().Err(err).Msg("Error sending expired staking event") - return types.NewInternalServiceError( - fmt.Errorf("failed to send expired staking event: %w", err), - ) + if err := s.sendWithdrawableDelegationEvent(ctx, delegation); err != nil { + return err } if err := s.db.UpdateBTCDelegationState(ctx, delegation.StakingTxHashHex, types.StateWithdrawable); err != nil { From 8f797058377ec7cc3ef9b6768d6962f39a0a76f2 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 15:47:04 -0400 Subject: [PATCH 5/7] fix err --- internal/services/delegation.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 2651f9b..bdb59b9 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -32,12 +32,15 @@ func (s *Service) processNewBTCDelegationEvent( return err } - if validationErr := s.validateBTCDelegationCreatedEvent(ctx, newDelegation); validationErr != nil { + if validationErr := s.validateBTCDelegationCreatedEvent(newDelegation); validationErr != nil { return validationErr } delegationDoc := model.FromEventBTCDelegationCreated(newDelegation) - s.emitConsumerEvent(ctx, types.StatePending, delegationDoc) + consumerErr := s.emitConsumerEvent(ctx, types.StatePending, delegationDoc) + if consumerErr != nil { + return consumerErr + } if dbErr := s.db.SaveNewBTCDelegation( ctx, delegationDoc, @@ -84,9 +87,9 @@ func (s *Service) processCovenantQuorumReachedEvent( ) } newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - err = s.emitConsumerEvent(ctx, newState, delegation) - if err != nil { - return err + consumerErr := s.emitConsumerEvent(ctx, newState, delegation) + if consumerErr != nil { + return consumerErr } if dbErr := s.db.UpdateBTCDelegationState( @@ -133,7 +136,10 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( if newState == types.StateActive { // emit the consumer event only if the new state is ACTIVE // we do not need to emit the PENDING event because it was already emitted in the processNewBTCDelegationEvent - s.emitConsumerEvent(ctx, types.StateActive, delegation) + consumerErr := s.emitConsumerEvent(ctx, types.StateActive, delegation) + if consumerErr != nil { + return consumerErr + } } if dbErr := s.db.UpdateBTCDelegationDetails( @@ -179,7 +185,10 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), ) } - s.emitConsumerEvent(ctx, types.StateUnbonding, delegation) + consumerErr := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation) + if consumerErr != nil { + return consumerErr + } if dbErr := s.db.UpdateBTCDelegationState( ctx, unbondedEarlyEvent.StakingTxHash, types.StateUnbonding, @@ -244,7 +253,7 @@ func (s *Service) processBTCDelegationExpiredEvent( return nil } -func (s *Service) validateBTCDelegationCreatedEvent(ctx context.Context, event *bbntypes.EventBTCDelegationCreated) *types.Error { +func (s *Service) validateBTCDelegationCreatedEvent(event *bbntypes.EventBTCDelegationCreated) *types.Error { // Check if the staking tx hash is present if event.StakingTxHash == "" { return types.NewErrorWithMsg( From 8040519ccb84b25bab4070e035e8e4a1ae998261 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 16:04:53 -0400 Subject: [PATCH 6/7] fix --- internal/services/expiry-checker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/services/expiry-checker.go b/internal/services/expiry-checker.go index 1942956..7afa6b8 100644 --- a/internal/services/expiry-checker.go +++ b/internal/services/expiry-checker.go @@ -53,8 +53,9 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } - if err := s.sendWithdrawableDelegationEvent(ctx, delegation); err != nil { - return err + consumerErr := s.emitConsumerEvent(ctx, types.StateWithdrawable, delegation) + if consumerErr != nil { + return consumerErr } if err := s.db.UpdateBTCDelegationState(ctx, delegation.StakingTxHashHex, types.StateWithdrawable); err != nil { From cb31dd908e2b7cfcb05329f8735fe06a1903031d Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 24 Oct 2024 16:44:52 -0400 Subject: [PATCH 7/7] throw err if invalid del state --- internal/services/consumer_events.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 1fc3620..3cc5e92 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "net/http" "strconv" "time" @@ -27,7 +28,11 @@ func (s *Service) emitConsumerEvent( case types.StateWithdrawable: return s.sendWithdrawableDelegationEvent(ctx, delegation) default: - return nil + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("unknown delegation state: %s", newState), + ) } }