From d20b628f6c993e9606558df444b89a1c70f63877 Mon Sep 17 00:00:00 2001 From: Gurjot Singh <111540954+gusin13@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:06:14 -0400 Subject: [PATCH] feat: emit events for api (#31) --- go.mod | 3 +- go.sum | 3 + internal/queue/client/client.go | 37 +++ internal/queue/client/rabbitmq_client.go | 279 +++++++++++++++++++ internal/queue/client/schema.go | 330 +++++++++++++++++++++++ internal/queue/queue.go | 136 +++++++++- internal/services/consumer_events.go | 102 +++++++ internal/services/delegation.go | 59 +++- internal/services/expiry-checker.go | 5 + internal/utils/utils.go | 11 + 10 files changed, 958 insertions(+), 7 deletions(-) create mode 100644 internal/queue/client/client.go create mode 100644 internal/queue/client/rabbitmq_client.go create mode 100644 internal/queue/client/schema.go create mode 100644 internal/services/consumer_events.go diff --git a/go.mod b/go.mod index 75de9ac..a1175e6 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,12 @@ require ( github.com/babylonlabs-io/babylon v0.12.1 github.com/babylonlabs-io/staking-queue-client v0.4.1 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 + github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/cometbft/cometbft v0.38.7 github.com/cosmos/cosmos-sdk v0.50.6 github.com/cosmos/gogoproto v1.7.0 github.com/go-chi/chi/v5 v5.1.0 + github.com/rabbitmq/amqp091-go v1.9.0 github.com/spf13/viper v1.19.0 ) @@ -44,7 +46,6 @@ require ( github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/btcutil v1.1.6 // indirect - github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect diff --git a/go.sum b/go.sum index 3d333d2..15a4af0 100644 --- a/go.sum +++ b/go.sum @@ -1023,6 +1023,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.3.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1199,6 +1201,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= diff --git a/internal/queue/client/client.go b/internal/queue/client/client.go new file mode 100644 index 0000000..1c0f92b --- /dev/null +++ b/internal/queue/client/client.go @@ -0,0 +1,37 @@ +package client + +import ( + "context" + + "github.com/babylonlabs-io/staking-queue-client/config" +) + +type QueueMessage struct { + Body string + Receipt string + RetryAttempts int32 +} + +func (m QueueMessage) IncrementRetryAttempts() int32 { + m.RetryAttempts++ + return m.RetryAttempts +} + +func (m QueueMessage) GetRetryAttempts() int32 { + return m.RetryAttempts +} + +// A common interface for queue clients regardless if it's a SQS, RabbitMQ, etc. +type QueueClient interface { + SendMessage(ctx context.Context, messageBody string) error + ReceiveMessages() (<-chan QueueMessage, error) + DeleteMessage(receipt string) error + Stop() error + GetQueueName() string + ReQueueMessage(ctx context.Context, message QueueMessage) error + Ping(ctx context.Context) error +} + +func NewQueueClient(config *config.QueueConfig, queueName string) (QueueClient, error) { + return NewRabbitMqClient(config, queueName) +} diff --git a/internal/queue/client/rabbitmq_client.go b/internal/queue/client/rabbitmq_client.go new file mode 100644 index 0000000..975921f --- /dev/null +++ b/internal/queue/client/rabbitmq_client.go @@ -0,0 +1,279 @@ +package client + +import ( + "context" + "fmt" + "strconv" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/babylonlabs-io/staking-queue-client/config" +) + +const ( + dlxName = "common_dlx" + dlxRoutingPostfix = "_routing_key" + delayedQueuePostfix = "_delay" +) + +type RabbitMqClient struct { + connection *amqp.Connection + channel *amqp.Channel + queueName string + stopCh chan struct{} // This is used to gracefully stop the message receiving loop + delayedRequeueTime time.Duration +} + +func NewRabbitMqClient(config *config.QueueConfig, queueName string) (*RabbitMqClient, error) { + amqpURI := fmt.Sprintf("amqp://%s:%s@%s", config.QueueUser, config.QueuePassword, config.Url) + + conn, err := amqp.Dial(amqpURI) + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + + // Declare a single common DLX for all queues + err = ch.ExchangeDeclare(dlxName, + "direct", + true, + false, + false, + false, + amqp.Table{ + "x-queue-type": config.QueueType, + }, + ) + if err != nil { + return nil, err + } + + // Declare a delay queue specific to this particular queue + delayQueueName := queueName + delayedQueuePostfix + _, err = ch.QueueDeclare( + delayQueueName, + true, + false, + false, + false, + amqp.Table{ + // Default exchange to route messages back to the main queue + // The "" in rabbitMq referring to the default exchange which allows + // to route messages to the queue by the routing key which is the queue name + "x-queue-type": config.QueueType, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": queueName, + }, + ) + if err != nil { + return nil, err + } + + // Declare the queue that will be created if not exists + customDlxRoutingKey := queueName + dlxRoutingPostfix + _, err = ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp.Table{ + "x-queue-type": config.QueueType, + "x-dead-letter-exchange": dlxName, + "x-dead-letter-routing-key": customDlxRoutingKey, + }, + ) + if err != nil { + return nil, err + } + + // Bind the delay queue to the common DLX + err = ch.QueueBind(delayQueueName, customDlxRoutingKey, dlxName, false, nil) + if err != nil { + return nil, err + } + + err = ch.Confirm(false) + if err != nil { + return nil, err + } + + return &RabbitMqClient{ + connection: conn, + channel: ch, + queueName: queueName, + stopCh: make(chan struct{}), + delayedRequeueTime: time.Duration(config.ReQueueDelayTime) * time.Second, + }, nil +} + +// Ping checks the health of the RabbitMQ infrastructure. +func (c *RabbitMqClient) Ping(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Check if the RabbitMQ connection is closed + if c.connection.IsClosed() { + return fmt.Errorf("rabbitMQ connection is closed") + } + + // Check if the RabbitMQ channel is closed + if c.channel.IsClosed() { + return fmt.Errorf("rabbitMQ channel is closed") + } + } + + return nil +} + +func (c *RabbitMqClient) ReceiveMessages() (<-chan QueueMessage, error) { + msgs, err := c.channel.Consume( + c.queueName, // queueName + "", // consumer + false, // auto-ack. We want to manually acknowledge the message after processing it. + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, err + } + output := make(chan QueueMessage) + go func() { + defer close(output) + for { + select { + case d, ok := <-msgs: + if !ok { + return // Channel closed, exit goroutine + } + attempts := d.Headers["x-processing-attempts"] + if attempts == nil { + attempts = int32(0) + } + currentAttempt := attempts.(int32) + + output <- QueueMessage{ + Body: string(d.Body), + Receipt: strconv.FormatUint(d.DeliveryTag, 10), + RetryAttempts: currentAttempt, + } + case <-c.stopCh: + return // Stop signal received, exit goroutine + } + } + }() + + return output, nil +} + +// DeleteMessage deletes a message from the queue. In RabbitMQ, this is equivalent to acknowledging the message. +// The deliveryTag is the unique identifier for the message. +func (c *RabbitMqClient) DeleteMessage(deliveryTag string) error { + deliveryTagInt, err := strconv.ParseUint(deliveryTag, 10, 64) + if err != nil { + return err + } + return c.channel.Ack(deliveryTagInt, false) +} + +// ReQueueMessage requeues a message back to the queue with a delay. +// This is done by sending the message again with an incremented counter. +// The original message is then deleted from the queue. +func (c *RabbitMqClient) ReQueueMessage(ctx context.Context, message QueueMessage) error { + // For requeueing, we will send the message to a delay queue that has a TTL pre-configured. + delayQueueName := c.queueName + delayedQueuePostfix + err := c.sendMessageWithAttempts(ctx, message.Body, delayQueueName, message.IncrementRetryAttempts(), c.delayedRequeueTime) + if err != nil { + return fmt.Errorf("failed to requeue message: %w", err) + } + + err = c.DeleteMessage(message.Receipt) + if err != nil { + return fmt.Errorf("failed to delete message while requeuing: %w", err) + } + + return nil +} + +// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. +func (c *RabbitMqClient) sendMessageWithAttempts(ctx context.Context, messageBody, queueName string, attempts int32, ttl time.Duration) error { + // Ensure the channel is open + if c.channel == nil { + return fmt.Errorf("RabbitMQ channel not initialized") + } + + // Prepare new headers with the incremented counter + newHeaders := amqp.Table{ + "x-processing-attempts": attempts, + } + + publishMsg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + Headers: newHeaders, + } + + // Exclude the expiration if the TTL is 0. + if ttl > 0 { + publishMsg.Expiration = strconv.Itoa(int(ttl.Milliseconds())) + } + + // Publish a message to the queue + confirmation, err := c.channel.PublishWithDeferredConfirmWithContext( + ctx, + "", // exchange: Use the default exchange + queueName, // routing key: The queue this message should be routed to + true, // mandatory: true indicates the server must route the message to a queue, otherwise error + false, // immediate: false indicates the server may wait to send the message until a consumer is available + publishMsg, + ) + + if err != nil { + return fmt.Errorf("failed to publish a message to queue %s: %w", queueName, err) + } + + if confirmation == nil { + return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) + } + confirmed, err := confirmation.WaitContext(ctx) + if err != nil { + return fmt.Errorf("failed to confirm message when publishing into queue %s: %w", queueName, err) + } + if !confirmed { + return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) + } + + return nil +} + +// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. +func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) error { + return c.sendMessageWithAttempts(ctx, messageBody, c.queueName, 0, 0) +} + +// Stop stops the message receiving process. +func (c *RabbitMqClient) Stop() error { + if err := c.channel.Close(); err != nil { + return err + } + if err := c.connection.Close(); err != nil { + return err + } + + close(c.stopCh) + + return nil +} + +func (c *RabbitMqClient) GetQueueName() string { + return c.queueName +} diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go new file mode 100644 index 0000000..1af41c7 --- /dev/null +++ b/internal/queue/client/schema.go @@ -0,0 +1,330 @@ +package client + +const ( + ActiveStakingQueueName string = "active_staking_queue" + UnbondingStakingQueueName string = "unbonding_staking_queue" + WithdrawStakingQueueName string = "withdraw_staking_queue" + ExpiredStakingQueueName string = "expired_staking_queue" + StakingStatsQueueName string = "staking_stats_queue" + BtcInfoQueueName string = "btc_info_queue" + ConfirmedInfoQueueName string = "confirmed_info_queue" + VerifiedStakingQueueName string = "verified_staking_queue" + PendingStakingQueueName string = "pending_staking_queue" +) + +const ( + ActiveStakingEventType EventType = 1 + UnbondingStakingEventType EventType = 2 + WithdrawStakingEventType EventType = 3 + ExpiredStakingEventType EventType = 4 + StatsEventType EventType = 5 + BtcInfoEventType EventType = 6 + ConfirmedInfoEventType EventType = 7 + VerifiedStakingEventType EventType = 8 + PendingStakingEventType EventType = 9 +) + +// Event schema versions, only increment when the schema changes +const ( + ActiveEventVersion int = 0 + UnbondingEventVersion int = 0 + WithdrawEventVersion int = 1 + ExpiredEventVersion int = 0 + StatsEventVersion int = 1 + BtcInfoEventVersion int = 0 + ConfirmedInfoEventVersion int = 0 + VerifiedEventVersion int = 0 + PendingEventVersion int = 0 +) + +type EventType int + +type EventMessage interface { + GetEventType() EventType + GetStakingTxHashHex() string +} + +type ActiveStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingValue uint64 `json:"staking_value"` + StakingStartHeight uint64 `json:"staking_start_height"` + StakingStartTimestamp int64 `json:"staking_start_timestamp"` + StakingTimeLock uint64 `json:"staking_timelock"` + StakingOutputIndex uint64 `json:"staking_output_index"` + StakingTxHex string `json:"staking_tx_hex"` + IsOverflow bool `json:"is_overflow"` +} + +func (e ActiveStakingEvent) GetEventType() EventType { + return ActiveStakingEventType +} + +func (e ActiveStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewActiveStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingValue uint64, + stakingStartHeight uint64, + stakingStartTimestamp int64, + stakingTimeLock uint64, + stakingOutputIndex uint64, + stakingTxHex string, + isOverflow bool, +) ActiveStakingEvent { + return ActiveStakingEvent{ + SchemaVersion: ActiveEventVersion, + EventType: ActiveStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingValue: stakingValue, + StakingStartHeight: stakingStartHeight, + StakingStartTimestamp: stakingStartTimestamp, + StakingTimeLock: stakingTimeLock, + StakingOutputIndex: stakingOutputIndex, + StakingTxHex: stakingTxHex, + IsOverflow: isOverflow, + } +} + +type UnbondingStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + UnbondingStartHeight uint64 `json:"unbonding_start_height"` + UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` + UnbondingTimeLock uint64 `json:"unbonding_timelock"` + UnbondingOutputIndex uint64 `json:"unbonding_output_index"` + UnbondingTxHex string `json:"unbonding_tx_hex"` + UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` +} + +func (e UnbondingStakingEvent) GetEventType() EventType { + return UnbondingStakingEventType +} + +func (e UnbondingStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewUnbondingStakingEvent( + stakingTxHashHex string, + unbondingStartHeight uint64, + unbondingStartTimestamp int64, + unbondingTimeLock uint64, + unbondingOutputIndex uint64, + unbondingTxHex string, + unbondingTxHashHex string, +) UnbondingStakingEvent { + return UnbondingStakingEvent{ + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + UnbondingStartHeight: unbondingStartHeight, + UnbondingStartTimestamp: unbondingStartTimestamp, + UnbondingTimeLock: unbondingTimeLock, + UnbondingOutputIndex: unbondingOutputIndex, + UnbondingTxHex: unbondingTxHex, + UnbondingTxHashHex: unbondingTxHashHex, + } +} + +type WithdrawStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + WithdrawTxHashHex string `json:"withdraw_tx_hash_hex"` + WithdrawTxBtcHeight uint64 `json:"withdraw_tx_btc_height"` + WithdrawTxHex string `json:"withdraw_tx_hex"` +} + +func (e WithdrawStakingEvent) GetEventType() EventType { + return WithdrawStakingEventType +} + +func (e WithdrawStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewWithdrawStakingEvent( + stakingTxHashHex string, + withdrawTxHashHex string, + withdrawTxBtcHeight uint64, + withdrawTxHex string, +) WithdrawStakingEvent { + return WithdrawStakingEvent{ + SchemaVersion: WithdrawEventVersion, + EventType: WithdrawStakingEventType, + StakingTxHashHex: stakingTxHashHex, + WithdrawTxHashHex: withdrawTxHashHex, + WithdrawTxBtcHeight: withdrawTxBtcHeight, + WithdrawTxHex: withdrawTxHex, + } +} + +type ExpiredStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + TxType string `json:"tx_type"` +} + +func (e ExpiredStakingEvent) GetEventType() EventType { + return ExpiredStakingEventType +} + +func (e ExpiredStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { + return ExpiredStakingEvent{ + SchemaVersion: ExpiredEventVersion, + EventType: ExpiredStakingEventType, + StakingTxHashHex: stakingTxHashHex, + TxType: txType, + } +} + +type StatsEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 5. StatsEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerPkHex string `json:"staker_pk_hex"` + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + StakingValue uint64 `json:"staking_value"` + State string `json:"state"` + IsOverflow bool `json:"is_overflow"` +} + +func (e StatsEvent) GetEventType() EventType { + return StatsEventType +} + +func (e StatsEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewStatsEvent( + stakingTxHashHex string, + stakerPkHex string, + finalityProviderPkHex string, + stakingValue uint64, + state string, + isOverflow bool, +) StatsEvent { + return StatsEvent{ + SchemaVersion: StatsEventVersion, + EventType: StatsEventType, + StakingTxHashHex: stakingTxHashHex, + StakerPkHex: stakerPkHex, + FinalityProviderPkHex: finalityProviderPkHex, + StakingValue: stakingValue, + State: state, + IsOverflow: isOverflow, + } +} + +type BtcInfoEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 6. BtcInfoEventType + Height uint64 `json:"height"` + ConfirmedTvl uint64 `json:"confirmed_tvl"` + UnconfirmedTvl uint64 `json:"unconfirmed_tvl"` +} + +func (e BtcInfoEvent) GetEventType() EventType { + return BtcInfoEventType +} + +// Not applicable, add it here to implement the EventMessage interface +func (e BtcInfoEvent) GetStakingTxHashHex() string { + return "" +} + +func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { + return BtcInfoEvent{ + SchemaVersion: BtcInfoEventVersion, + EventType: BtcInfoEventType, + Height: height, + ConfirmedTvl: confirmedTvl, + UnconfirmedTvl: unconfirmedTvl, + } +} + +type ConfirmedInfoEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType + Height uint64 `json:"height"` + Tvl uint64 `json:"tvl"` +} + +func (e ConfirmedInfoEvent) GetEventType() EventType { + return ConfirmedInfoEventType +} + +// Not applicable, add it here to implement the EventMessage interface +func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { + return "" +} + +func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { + return ConfirmedInfoEvent{ + SchemaVersion: ConfirmedInfoEventVersion, + EventType: ConfirmedInfoEventType, + Height: height, + Tvl: tvl, + } +} + +type VerifiedStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 8. VerifiedStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` +} + +func (e VerifiedStakingEvent) GetEventType() EventType { + return VerifiedStakingEventType +} + +func (e VerifiedStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewVerifiedStakingEvent(stakingTxHashHex string) VerifiedStakingEvent { + return VerifiedStakingEvent{ + SchemaVersion: VerifiedEventVersion, + EventType: VerifiedStakingEventType, + StakingTxHashHex: stakingTxHashHex, + } +} + +type PendingStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 9. PendingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` +} + +func (e PendingStakingEvent) GetEventType() EventType { + return PendingStakingEventType +} + +func (e PendingStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} + +func NewPendingStakingEvent(stakingTxHashHex string) PendingStakingEvent { + return PendingStakingEvent{ + SchemaVersion: PendingEventVersion, + EventType: PendingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + } +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 9c117d0..62cf0a8 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -1,19 +1,151 @@ package queue import ( + "context" + "encoding/json" + "fmt" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/rs/zerolog/log" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" queueConfig "github.com/babylonlabs-io/staking-queue-client/config" ) type QueueManager struct { + stakingExpiredEventQueue client.QueueClient + unbondingEventQueue client.QueueClient + activeStakingEventQueue client.QueueClient + verifiedStakingEventQueue client.QueueClient + pendingStakingEventQueue client.QueueClient } func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - return &QueueManager{}, nil + stakingEventQueue, err := client.NewQueueClient(cfg, client.ExpiredStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) + } + + unbondingEventQueue, err := client.NewQueueClient(cfg, client.UnbondingStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize unbonding event queue: %w", err) + } + + activeStakingEventQueue, err := client.NewQueueClient(cfg, client.ActiveStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize active staking event queue: %w", err) + } + + verifiedStakingEventQueue, err := client.NewQueueClient(cfg, client.VerifiedStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize verified staking event queue: %w", err) + } + + pendingStakingEventQueue, err := client.NewQueueClient(cfg, client.PendingStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to initialize pending staking event queue: %w", err) + } + + return &QueueManager{ + stakingExpiredEventQueue: stakingEventQueue, + unbondingEventQueue: unbondingEventQueue, + activeStakingEventQueue: activeStakingEventQueue, + verifiedStakingEventQueue: verifiedStakingEventQueue, + pendingStakingEventQueue: pendingStakingEventQueue, + }, nil +} + +func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.ExpiredStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("publishing expired staking event") + err = qm.stakingExpiredEventQueue.SendMessage(ctx, messageBody) + if err != nil { + metrics.RecordQueueSendError() + log.Fatal().Err(err).Str("tx_hash", ev.StakingTxHashHex).Msg("failed to publish staking event") + } + log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("successfully published expired staking event") + + return nil +} + +func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("pushing unbonding event") + err = qm.unbondingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push unbonding event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("successfully pushed unbonding event") + + return nil +} + +func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.ActiveStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing active staking event") + err = qm.activeStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push active staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed active staking event") + + return nil +} + +func (qm *QueueManager) SendVerifiedStakingEvent(ctx context.Context, ev *client.VerifiedStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing verified staking event") + err = qm.verifiedStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push verified staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed verified staking event") + + return nil +} + +func (qm *QueueManager) SendPendingStakingEvent(ctx context.Context, ev *client.PendingStakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing pending staking event") + err = qm.pendingStakingEventQueue.SendMessage(ctx, messageBody) + if err != nil { + return fmt.Errorf("failed to push pending staking event: %w", err) + } + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed pending staking event") + + return nil } // Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. func (qm *QueueManager) Shutdown() { - log.Info().Msg("Shutting down queue manager") + err := qm.stakingExpiredEventQueue.Stop() + if err != nil { + log.Error().Err(err).Msg("failed to stop staking expired event queue") + } + } diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go new file mode 100644 index 0000000..3cc5e92 --- /dev/null +++ b/internal/services/consumer_events.go @@ -0,0 +1,102 @@ +package services + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + "github.com/rs/zerolog/log" +) + +func (s *Service) emitConsumerEvent( + ctx context.Context, newState types.DelegationState, delegation *model.BTCDelegationDetails, +) *types.Error { + switch newState { + case types.StateActive: + return s.sendActiveDelegationEvent(ctx, delegation) + case types.StateVerified: + return s.sendVerifiedDelegationEvent(ctx, delegation) + case types.StatePending: + return s.sendPendingDelegationEvent(ctx, delegation) + case types.StateUnbonding: + return s.sendUnbondingDelegationEvent(ctx, delegation) + case types.StateWithdrawable: + return s.sendWithdrawableDelegationEvent(ctx, delegation) + default: + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("unknown delegation state: %s", newState), + ) + } +} + +func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + stakingTime, _ := strconv.ParseUint(delegation.StakingTime, 10, 64) + stakingAmount, _ := strconv.ParseUint(delegation.StakingAmount, 10, 64) + ev := queueclient.NewActiveStakingEvent( + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + stakingAmount, + uint64(delegation.StartHeight), + time.Now().Unix(), + stakingTime, + 0, + "", + false, + ) + if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) + } + return nil +} + +func (s *Service) sendVerifiedDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) + if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) + } + return nil +} + +func (s *Service) sendPendingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewPendingStakingEvent(delegation.StakingTxHashHex) + if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) + } + return nil +} + +func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewUnbondingStakingEvent( + delegation.StakingTxHashHex, + uint64(delegation.EndHeight), + time.Now().Unix(), + uint64(delegation.StartHeight), + uint64(delegation.EndHeight), + delegation.UnbondingTx, + delegation.UnbondingTime, + ) + if err := s.queueManager.SendUnbondingStakingEvent(ctx, &ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to send unbonding staking event: %w", err)) + } + return nil +} + +func (s *Service) sendWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { + ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, "") // TODO: add the correct tx type + if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { + log.Error().Err(err).Msg("Error sending expired staking event") + return types.NewInternalServiceError( + fmt.Errorf("failed to send expired staking event: %w", err), + ) + } + + return nil +} diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 6559fed..bdb59b9 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -32,12 +32,18 @@ func (s *Service) processNewBTCDelegationEvent( return err } - if validationErr := s.validateBTCDelegationCreatedEvent(ctx, newDelegation); validationErr != nil { + if validationErr := s.validateBTCDelegationCreatedEvent(newDelegation); validationErr != nil { return validationErr } + delegationDoc := model.FromEventBTCDelegationCreated(newDelegation) + consumerErr := s.emitConsumerEvent(ctx, types.StatePending, delegationDoc) + if consumerErr != nil { + return consumerErr + } + if dbErr := s.db.SaveNewBTCDelegation( - ctx, model.FromEventBTCDelegationCreated(newDelegation), + ctx, delegationDoc, ); dbErr != nil { if db.IsDuplicateKeyError(dbErr) { // BTC delegation already exists, ignore the event @@ -72,8 +78,22 @@ func (s *Service) processCovenantQuorumReachedEvent( return nil } + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + newState := types.DelegationState(covenantQuorumReachedEvent.NewState) + consumerErr := s.emitConsumerEvent(ctx, newState, delegation) + if consumerErr != nil { + return consumerErr + } + if dbErr := s.db.UpdateBTCDelegationState( - ctx, covenantQuorumReachedEvent.StakingTxHash, types.DelegationState(covenantQuorumReachedEvent.NewState), + ctx, covenantQuorumReachedEvent.StakingTxHash, newState, ); dbErr != nil { return types.NewError( http.StatusInternalServerError, @@ -104,6 +124,24 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( return nil } + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + newState := types.DelegationState(inclusionProofEvent.NewState) + if newState == types.StateActive { + // emit the consumer event only if the new state is ACTIVE + // we do not need to emit the PENDING event because it was already emitted in the processNewBTCDelegationEvent + consumerErr := s.emitConsumerEvent(ctx, types.StateActive, delegation) + if consumerErr != nil { + return consumerErr + } + } + if dbErr := s.db.UpdateBTCDelegationDetails( ctx, inclusionProofEvent.StakingTxHash, model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent), ); dbErr != nil { @@ -139,6 +177,19 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( // TODO: save timelock expire, need to figure out what will be the expire height in this case. // https://github.com/babylonlabs-io/babylon-staking-indexer/issues/28 + delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, unbondedEarlyEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + consumerErr := s.emitConsumerEvent(ctx, types.StateUnbonding, delegation) + if consumerErr != nil { + return consumerErr + } + if dbErr := s.db.UpdateBTCDelegationState( ctx, unbondedEarlyEvent.StakingTxHash, types.StateUnbonding, ); dbErr != nil { @@ -202,7 +253,7 @@ func (s *Service) processBTCDelegationExpiredEvent( return nil } -func (s *Service) validateBTCDelegationCreatedEvent(ctx context.Context, event *bbntypes.EventBTCDelegationCreated) *types.Error { +func (s *Service) validateBTCDelegationCreatedEvent(event *bbntypes.EventBTCDelegationCreated) *types.Error { // Check if the staking tx hash is present if event.StakingTxHash == "" { return types.NewErrorWithMsg( diff --git a/internal/services/expiry-checker.go b/internal/services/expiry-checker.go index 46ccd1e..7afa6b8 100644 --- a/internal/services/expiry-checker.go +++ b/internal/services/expiry-checker.go @@ -53,6 +53,11 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } + consumerErr := s.emitConsumerEvent(ctx, types.StateWithdrawable, delegation) + if consumerErr != nil { + return consumerErr + } + if err := s.db.UpdateBTCDelegationState(ctx, delegation.StakingTxHashHex, types.StateWithdrawable); err != nil { log.Error().Err(err).Msg("Error updating BTC delegation state to withdrawable") return types.NewInternalServiceError( diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 253bcdf..5e0a451 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,11 +1,14 @@ package utils import ( + "bytes" "runtime" "strconv" "strings" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" ) type SupportedBtcNetwork string @@ -99,3 +102,11 @@ func Contains[T comparable](slice []T, item T) bool { } return false } + +func GetTxHash(unbondingTxBytes []byte) (chainhash.Hash, error) { + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(unbondingTxBytes)); err != nil { + return chainhash.Hash{}, err + } + return msgTx.TxHash(), nil +}