Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit events for api #31

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ require (
github.com/babylonlabs-io/babylon v0.12.1
github.com/babylonlabs-io/staking-queue-client v0.4.1
github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cometbft/cometbft v0.38.7
github.com/cosmos/cosmos-sdk v0.50.6
github.com/cosmos/gogoproto v1.7.0
github.com/go-chi/chi/v5 v5.1.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/spf13/viper v1.19.0
)

Expand Down Expand Up @@ -44,7 +46,6 @@ require (
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/btcutil v1.1.6 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.3.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down Expand Up @@ -1199,6 +1201,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down
37 changes: 37 additions & 0 deletions internal/queue/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"context"

"github.com/babylonlabs-io/staking-queue-client/config"
)

type QueueMessage struct {
Body string
Receipt string
RetryAttempts int32
}

func (m QueueMessage) IncrementRetryAttempts() int32 {
m.RetryAttempts++
return m.RetryAttempts
}

func (m QueueMessage) GetRetryAttempts() int32 {
return m.RetryAttempts
}

// A common interface for queue clients regardless if it's a SQS, RabbitMQ, etc.
type QueueClient interface {
SendMessage(ctx context.Context, messageBody string) error
ReceiveMessages() (<-chan QueueMessage, error)
DeleteMessage(receipt string) error
Stop() error
GetQueueName() string
ReQueueMessage(ctx context.Context, message QueueMessage) error
Ping(ctx context.Context) error
}

func NewQueueClient(config *config.QueueConfig, queueName string) (QueueClient, error) {
return NewRabbitMqClient(config, queueName)
}
279 changes: 279 additions & 0 deletions internal/queue/client/rabbitmq_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package client

import (
"context"
"fmt"
"strconv"
"time"

amqp "github.com/rabbitmq/amqp091-go"

"github.com/babylonlabs-io/staking-queue-client/config"
)

const (
dlxName = "common_dlx"
dlxRoutingPostfix = "_routing_key"
delayedQueuePostfix = "_delay"
)

type RabbitMqClient struct {
connection *amqp.Connection
channel *amqp.Channel
queueName string
stopCh chan struct{} // This is used to gracefully stop the message receiving loop
delayedRequeueTime time.Duration
}

func NewRabbitMqClient(config *config.QueueConfig, queueName string) (*RabbitMqClient, error) {
amqpURI := fmt.Sprintf("amqp://%s:%s@%s", config.QueueUser, config.QueuePassword, config.Url)

conn, err := amqp.Dial(amqpURI)
if err != nil {
return nil, err
}

ch, err := conn.Channel()
if err != nil {
return nil, err
}

// Declare a single common DLX for all queues
err = ch.ExchangeDeclare(dlxName,
"direct",
true,
false,
false,
false,
amqp.Table{
"x-queue-type": config.QueueType,
},
)
if err != nil {
return nil, err
}

// Declare a delay queue specific to this particular queue
delayQueueName := queueName + delayedQueuePostfix
_, err = ch.QueueDeclare(
delayQueueName,
true,
false,
false,
false,
amqp.Table{
// Default exchange to route messages back to the main queue
// The "" in rabbitMq referring to the default exchange which allows
// to route messages to the queue by the routing key which is the queue name
"x-queue-type": config.QueueType,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": queueName,
},
)
if err != nil {
return nil, err
}

// Declare the queue that will be created if not exists
customDlxRoutingKey := queueName + dlxRoutingPostfix
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-queue-type": config.QueueType,
"x-dead-letter-exchange": dlxName,
"x-dead-letter-routing-key": customDlxRoutingKey,
},
)
if err != nil {
return nil, err
}

// Bind the delay queue to the common DLX
err = ch.QueueBind(delayQueueName, customDlxRoutingKey, dlxName, false, nil)
if err != nil {
return nil, err
}

err = ch.Confirm(false)
if err != nil {
return nil, err
}

return &RabbitMqClient{
connection: conn,
channel: ch,
queueName: queueName,
stopCh: make(chan struct{}),
delayedRequeueTime: time.Duration(config.ReQueueDelayTime) * time.Second,
}, nil
}

// Ping checks the health of the RabbitMQ infrastructure.
func (c *RabbitMqClient) Ping(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Check if the RabbitMQ connection is closed
if c.connection.IsClosed() {
return fmt.Errorf("rabbitMQ connection is closed")
}

// Check if the RabbitMQ channel is closed
if c.channel.IsClosed() {
return fmt.Errorf("rabbitMQ channel is closed")
}
}

return nil
}

func (c *RabbitMqClient) ReceiveMessages() (<-chan QueueMessage, error) {
msgs, err := c.channel.Consume(
c.queueName, // queueName
"", // consumer
false, // auto-ack. We want to manually acknowledge the message after processing it.
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
output := make(chan QueueMessage)
go func() {
defer close(output)
for {
select {
case d, ok := <-msgs:
if !ok {
return // Channel closed, exit goroutine
}
attempts := d.Headers["x-processing-attempts"]
if attempts == nil {
attempts = int32(0)
}
currentAttempt := attempts.(int32)

output <- QueueMessage{
Body: string(d.Body),
Receipt: strconv.FormatUint(d.DeliveryTag, 10),
RetryAttempts: currentAttempt,
}
case <-c.stopCh:
return // Stop signal received, exit goroutine
}
}
}()

return output, nil
}

// DeleteMessage deletes a message from the queue. In RabbitMQ, this is equivalent to acknowledging the message.
// The deliveryTag is the unique identifier for the message.
func (c *RabbitMqClient) DeleteMessage(deliveryTag string) error {
deliveryTagInt, err := strconv.ParseUint(deliveryTag, 10, 64)
if err != nil {
return err
}
return c.channel.Ack(deliveryTagInt, false)
}

// ReQueueMessage requeues a message back to the queue with a delay.
// This is done by sending the message again with an incremented counter.
// The original message is then deleted from the queue.
func (c *RabbitMqClient) ReQueueMessage(ctx context.Context, message QueueMessage) error {
// For requeueing, we will send the message to a delay queue that has a TTL pre-configured.
delayQueueName := c.queueName + delayedQueuePostfix
err := c.sendMessageWithAttempts(ctx, message.Body, delayQueueName, message.IncrementRetryAttempts(), c.delayedRequeueTime)
if err != nil {
return fmt.Errorf("failed to requeue message: %w", err)
}

err = c.DeleteMessage(message.Receipt)
if err != nil {
return fmt.Errorf("failed to delete message while requeuing: %w", err)
}

return nil
}

// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation.
func (c *RabbitMqClient) sendMessageWithAttempts(ctx context.Context, messageBody, queueName string, attempts int32, ttl time.Duration) error {
// Ensure the channel is open
if c.channel == nil {
return fmt.Errorf("RabbitMQ channel not initialized")
}

// Prepare new headers with the incremented counter
newHeaders := amqp.Table{
"x-processing-attempts": attempts,
}

publishMsg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
Headers: newHeaders,
}

// Exclude the expiration if the TTL is 0.
if ttl > 0 {
publishMsg.Expiration = strconv.Itoa(int(ttl.Milliseconds()))
}

// Publish a message to the queue
confirmation, err := c.channel.PublishWithDeferredConfirmWithContext(
ctx,
"", // exchange: Use the default exchange
queueName, // routing key: The queue this message should be routed to
true, // mandatory: true indicates the server must route the message to a queue, otherwise error
false, // immediate: false indicates the server may wait to send the message until a consumer is available
publishMsg,
)

if err != nil {
return fmt.Errorf("failed to publish a message to queue %s: %w", queueName, err)
}

if confirmation == nil {
return fmt.Errorf("message not confirmed when publishing into queue %s", queueName)
}
confirmed, err := confirmation.WaitContext(ctx)
if err != nil {
return fmt.Errorf("failed to confirm message when publishing into queue %s: %w", queueName, err)
}
if !confirmed {
return fmt.Errorf("message not confirmed when publishing into queue %s", queueName)
}

return nil
}

// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation.
func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) error {
return c.sendMessageWithAttempts(ctx, messageBody, c.queueName, 0, 0)
}

// Stop stops the message receiving process.
func (c *RabbitMqClient) Stop() error {
if err := c.channel.Close(); err != nil {
return err
}
if err := c.connection.Close(); err != nil {
return err
}

close(c.stopCh)

return nil
}

func (c *RabbitMqClient) GetQueueName() string {
return c.queueName
}
Loading
Loading