From 674444a233e147a86568f63bb44fd321f1128895 Mon Sep 17 00:00:00 2001 From: Crypto Minion <154598612+jrwbabylonlab@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:16:52 +1100 Subject: [PATCH] feat: remove queue from the service (#21) * feat: remove queue from the service --------- Co-authored-by: jeremytsngtsng --- bin/local-startup.sh | 12 - cmd/staking-expiry-checker/main.go | 8 +- config/config-docker.yml | 8 - config/config-local.yml | 8 - .../images/staking-expiry-checker/Dockerfile | 2 +- docker-compose.yml | 12 - go.mod | 2 - go.sum | 14 +- internal/config/config.go | 14 +- internal/observability/metrics/metrics.go | 14 - internal/queue/queue.go | 55 ---- internal/services/service.go | 21 +- tests/config-test.yml | 8 - tests/expired_delegations_test.go | 136 +------- tests/setup.go | 308 ++++-------------- 15 files changed, 81 insertions(+), 541 deletions(-) delete mode 100644 internal/queue/queue.go diff --git a/bin/local-startup.sh b/bin/local-startup.sh index 5c80f49..4fc0c01 100755 --- a/bin/local-startup.sh +++ b/bin/local-startup.sh @@ -8,16 +8,4 @@ else echo "Starting MongoDB" # Start MongoDB docker compose up -d mongodb -fi - -# Check if the RabbitMQ container is already running -RABBITMQ_CONTAINER_NAME="rabbitmq" -if [ $(docker ps -q -f name=^/${RABBITMQ_CONTAINER_NAME}$) ]; then - echo "RabbitMQ container already running. Skipping RabbitMQ startup." -else - echo "Starting RabbitMQ" - # Start RabbitMQ - docker compose up -d rabbitmq - # Wait for RabbitMQ to start - sleep 10 fi \ No newline at end of file diff --git a/cmd/staking-expiry-checker/main.go b/cmd/staking-expiry-checker/main.go index b103085..7fd5c85 100644 --- a/cmd/staking-expiry-checker/main.go +++ b/cmd/staking-expiry-checker/main.go @@ -13,7 +13,6 @@ import ( "github.com/babylonlabs-io/staking-expiry-checker/internal/db" "github.com/babylonlabs-io/staking-expiry-checker/internal/observability/metrics" "github.com/babylonlabs-io/staking-expiry-checker/internal/poller" - "github.com/babylonlabs-io/staking-expiry-checker/internal/queue" "github.com/babylonlabs-io/staking-expiry-checker/internal/services" ) @@ -53,12 +52,7 @@ func main() { log.Fatal().Err(err).Msg("error while creating btc client") } - qm, err := queue.NewQueueManager(&cfg.Queue) - if err != nil { - log.Fatal().Err(err).Msg("error while creating queue manager") - } - - delegationService := services.NewService(dbClient, btcClient, qm) + delegationService := services.NewService(dbClient, btcClient) if err != nil { log.Fatal().Err(err).Msg("error while creating delegation service") } diff --git a/config/config-docker.yml b/config/config-docker.yml index a95b2be..30c0042 100644 --- a/config/config-docker.yml +++ b/config/config-docker.yml @@ -12,14 +12,6 @@ btc: net-params: testnet rpc-user: rpcuser rpc-pass: rpcpass -queue: - queue_user: user # can be replaced by values in .env file - queue_password: password - url: "localhost:5672" - processing_timeout: 5 # 5 second - msg_max_retry_attempts: 10 - requeue_delay_time: 300 - queue_type: quorum metrics: host: 0.0.0.0 port: 2112 \ No newline at end of file diff --git a/config/config-local.yml b/config/config-local.yml index e3b0584..30c0042 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -12,14 +12,6 @@ btc: net-params: testnet rpc-user: rpcuser rpc-pass: rpcpass -queue: - queue_user: user # can be replaced by values in .env file - queue_password: password - url: "localhost:5672" - processing_timeout: 5 # 5 second - msg_max_retry_attempts: 3 - requeue_delay_time: 60 - queue_type: quorum metrics: host: 0.0.0.0 port: 2112 \ No newline at end of file diff --git a/contrib/images/staking-expiry-checker/Dockerfile b/contrib/images/staking-expiry-checker/Dockerfile index 1d7f5e9..4896bb4 100644 --- a/contrib/images/staking-expiry-checker/Dockerfile +++ b/contrib/images/staking-expiry-checker/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21-alpine AS builder +FROM golang:1.23.1-alpine AS builder ARG VERSION="HEAD" diff --git a/docker-compose.yml b/docker-compose.yml index c379771..ca3d402 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,6 @@ services: - CONFIG=/home/staking-expiry-checker/config.yml depends_on: - mongodb - - rabbitmq volumes: - ./config/config-docker.yml:/home/staking-expiry-checker/config.yml:Z mongodb: @@ -22,14 +21,3 @@ services: volumes: - ./bin/init-mongo.sh:/init-mongo.sh entrypoint: [ "/init-mongo.sh" ] - rabbitmq: - image: rabbitmq:3-management - container_name: rabbitmq - ports: - - "5672:5672" # AMQP protocol port - - "15672:15672" # Management UI port - environment: - RABBITMQ_DEFAULT_USER: user - RABBITMQ_DEFAULT_PASS: password - volumes: - - "./rabbitmq_data:/var/lib/rabbitmq" diff --git a/go.mod b/go.mod index 4a5bed3..412ee36 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,7 @@ module github.com/babylonlabs-io/staking-expiry-checker go 1.21.6 require ( - github.com/babylonlabs-io/staking-queue-client v0.4.1 github.com/btcsuite/btcd v0.24.0 - github.com/rabbitmq/amqp091-go v1.9.0 github.com/spf13/viper v1.18.2 ) diff --git a/go.sum b/go.sum index 8c3100a..82f42ef 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,4 @@ github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= -github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHbUU9CzhF42Ke6roK+0N3I= -github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= @@ -83,11 +81,8 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -124,10 +119,8 @@ github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSz github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= -github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= @@ -173,8 +166,6 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -241,7 +232,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/internal/config/config.go b/internal/config/config.go index 3800dad..f48bb9a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,16 +5,14 @@ import ( "os" "strings" - queue "github.com/babylonlabs-io/staking-queue-client/config" "github.com/spf13/viper" ) type Config struct { - Poller PollerConfig `mapstructure:"poller"` - Db DbConfig `mapstructure:"db"` - Btc BtcConfig `mapstructure:"btc"` - Queue queue.QueueConfig `mapstructure:"queue"` - Metrics MetricsConfig `mapstructure:"metrics"` + Poller PollerConfig `mapstructure:"poller"` + Db DbConfig `mapstructure:"db"` + Btc BtcConfig `mapstructure:"btc"` + Metrics MetricsConfig `mapstructure:"metrics"` } func (cfg *Config) Validate() error { @@ -34,10 +32,6 @@ func (cfg *Config) Validate() error { return err } - if err := cfg.Queue.Validate(); err != nil { - return err - } - return nil } diff --git a/internal/observability/metrics/metrics.go b/internal/observability/metrics/metrics.go index 962bff1..aa94559 100644 --- a/internal/observability/metrics/metrics.go +++ b/internal/observability/metrics/metrics.go @@ -32,7 +32,6 @@ var ( metricsRouter *chi.Mux pollDurationHistogram *prometheus.HistogramVec btcClientDurationHistogram *prometheus.HistogramVec - queueSendErrorCounter prometheus.Counter ) // Init initializes the metrics package. @@ -89,18 +88,9 @@ func registerMetrics() { []string{"function", "status"}, ) - // add a counter for the number of errors from the fail to push message into queue - queueSendErrorCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "queue_send_error_count", - Help: "The total number of errors when sending messages to the queue", - }, - ) - prometheus.MustRegister( pollDurationHistogram, btcClientDurationHistogram, - queueSendErrorCounter, ) } @@ -126,7 +116,3 @@ func RecordBtcClientMetrics[T any](clientRequest func() (T, error)) (T, error) { return result, err } - -func RecordQueueSendError() { - queueSendErrorCounter.Inc() -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 4592b16..0000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,55 +0,0 @@ -package queue - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/rs/zerolog/log" - - "github.com/babylonlabs-io/staking-expiry-checker/internal/observability/metrics" - "github.com/babylonlabs-io/staking-queue-client/client" - queueConfig "github.com/babylonlabs-io/staking-queue-client/config" -) - -type QueueManager struct { - stakingExpiredEventQueue client.QueueClient -} - -func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - stakingEventQueue, err := client.NewQueueClient(cfg, client.ExpiredStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) - } - - return &QueueManager{ - stakingExpiredEventQueue: stakingEventQueue, - }, nil -} - -func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.ExpiredStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("publishing expired staking event") - err = qm.stakingExpiredEventQueue.SendMessage(ctx, messageBody) - if err != nil { - metrics.RecordQueueSendError() - log.Fatal().Err(err).Str("tx_hash", ev.StakingTxHashHex).Msg("failed to publish staking event") - } - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("successfully published expired staking event") - - return nil -} - -// Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. -func (qm *QueueManager) Shutdown() { - err := qm.stakingExpiredEventQueue.Stop() - if err != nil { - log.Error().Err(err).Msg("failed to stop staking expired event queue") - } - -} diff --git a/internal/services/service.go b/internal/services/service.go index cf7b3d9..9051cd2 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -5,22 +5,18 @@ import ( "github.com/babylonlabs-io/staking-expiry-checker/internal/btcclient" "github.com/babylonlabs-io/staking-expiry-checker/internal/db" - "github.com/babylonlabs-io/staking-expiry-checker/internal/queue" - queueclient "github.com/babylonlabs-io/staking-queue-client/client" "github.com/rs/zerolog/log" ) type Service struct { - db db.DbInterface - btc btcclient.BtcInterface - queueManager *queue.QueueManager + db db.DbInterface + btc btcclient.BtcInterface } -func NewService(db db.DbInterface, btc btcclient.BtcInterface, qm *queue.QueueManager) *Service { +func NewService(db db.DbInterface, btc btcclient.BtcInterface) *Service { return &Service{ - db: db, - btc: btc, - queueManager: qm, + db: db, + btc: btc, } } @@ -43,11 +39,8 @@ func (s *Service) ProcessExpiredDelegations(ctx context.Context) error { } for _, delegation := range expiredDelegations { - ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, delegation.TxType) - if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { - log.Error().Err(err).Msg("Error sending expired staking event") - return err - } + // TODO: Process the expired delegation. + log.Info().Msgf("Found a expired delegation, do nothing now: %v", delegation.ID) // After successfully sending the event, delete the entry from the database. if err := s.db.DeleteExpiredDelegation(ctx, delegation.ID); err != nil { log.Error().Err(err).Msg("Error deleting expired delegation") diff --git a/tests/config-test.yml b/tests/config-test.yml index 870442a..1977d79 100644 --- a/tests/config-test.yml +++ b/tests/config-test.yml @@ -12,14 +12,6 @@ btc: net-params: testnet rpc-user: rpcuser rpc-pass: rpcpass -queue: - queue_user: user # can be replaced by values in .env file - queue_password: password - url: "localhost:5672" - processing_timeout: 60 # 5 second - msg_max_retry_attempts: 2 - requeue_delay_time: 5 - queue_type: quorum metrics: host: 0.0.0.0 port: 2113 \ No newline at end of file diff --git a/tests/expired_delegations_test.go b/tests/expired_delegations_test.go index 19eb347..b5e1c67 100644 --- a/tests/expired_delegations_test.go +++ b/tests/expired_delegations_test.go @@ -1,145 +1,11 @@ package tests import ( - "errors" "testing" - "time" - "github.com/babylonlabs-io/staking-expiry-checker/internal/db/model" - "github.com/babylonlabs-io/staking-expiry-checker/tests/mocks" - "github.com/babylonlabs-io/staking-queue-client/client" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.mongodb.org/mongo-driver/bson/primitive" ) func TestProcessExpiredDelegations_NoErrors(t *testing.T) { - // setup mock btc client - mockBtc := new(mocks.BtcInterface) - expectedBtcTip := int64(1000) - mockBtc.On("GetBlockCount").Return(expectedBtcTip, nil) - - // assert that db is empty - docs := fetchAllTestDelegations(t) - require.Empty(t, docs) - - // setup test server - _, conn, teardown := setupTestServer(t, &TestServerDependency{ - MockBtcClient: mockBtc, - }) - defer teardown() - - // insert in db - expiredDelegations := []model.TimeLockDocument{ - { - ID: primitive.NewObjectID(), - StakingTxHashHex: "mockStakingTxHashHex1", - ExpireHeight: 999, - TxType: "active", - }, - - { - ID: primitive.NewObjectID(), - StakingTxHashHex: "mockStakingTxHashHex2", - ExpireHeight: 999, - TxType: "unbonding", - }, - } - insertTestDelegations(t, expiredDelegations) - - // Wait for the data - require.Eventually( - t, func() bool { - expiredQueueMessageCount, err := inspectQueueMessageCount(t, conn, client.ExpiredStakingQueueName) - return err == nil && expiredQueueMessageCount == 2 - }, 10*time.Second, 100*time.Millisecond, - ) - - // TODO: assert message contents to ensure the correct data is being sent - - // assert that documents are deleted now and db is empty - docs = fetchAllTestDelegations(t) - require.Empty(t, docs) -} - -func TestProcessExpiredDelegations_ErrorGettingBlockCount(t *testing.T) { - mockDB := new(mocks.DbInterface) - mockBtc := new(mocks.BtcInterface) - - mockBtc.On("GetBlockCount").Return(int64(0), errors.New("failed to get block count")) - - _, conn, teardown := setupTestServer(t, &TestServerDependency{ - MockDbClient: mockDB, - MockBtcClient: mockBtc, - }) - defer teardown() - - // Verify the process handles the error as expected - require.Eventually( - t, func() bool { - expiredQueueMessageCount, err := inspectQueueMessageCount(t, conn, client.ExpiredStakingQueueName) - return err == nil && expiredQueueMessageCount == 0 - }, 10*time.Second, 100*time.Millisecond, - ) -} - -func TestProcessExpiredDelegations_ErrorFindingExpiredDelegations(t *testing.T) { - mockDB := new(mocks.DbInterface) - mockBtc := new(mocks.BtcInterface) - - expectedBtcTip := int64(1000) - mockBtc.On("GetBlockCount").Return(expectedBtcTip, nil) - - mockDB.On("FindExpiredDelegations", mock.Anything, uint64(expectedBtcTip)). - Return(nil, errors.New("database error")) - - _, conn, teardown := setupTestServer(t, &TestServerDependency{ - MockDbClient: mockDB, - MockBtcClient: mockBtc, - }) - defer teardown() - - // Verify the process handles the error as expected - require.Eventually( - t, func() bool { - expiredQueueMessageCount, err := inspectQueueMessageCount(t, conn, client.ExpiredStakingQueueName) - return err == nil && expiredQueueMessageCount == 0 - }, 10*time.Second, 100*time.Millisecond, - ) -} - -func TestProcessExpiredDelegations_ErrorDeletingExpiredDelegation(t *testing.T) { - mockDB := new(mocks.DbInterface) - mockBtc := new(mocks.BtcInterface) - - expectedBtcTip := int64(1000) - mockBtc.On("GetBlockCount").Return(expectedBtcTip, nil) - - // Create an ObjectID for testing purposes - testID, _ := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011") - expiredDelegation := model.TimeLockDocument{ - ID: testID, - StakingTxHashHex: "mockStakingTxHashHex", - ExpireHeight: 999, - TxType: "active", - } - - mockDB.On("FindExpiredDelegations", mock.Anything, uint64(expectedBtcTip)). - Return([]model.TimeLockDocument{expiredDelegation}, nil) - mockDB.On("DeleteExpiredDelegation", mock.Anything, testID). - Return(errors.New("delete error")) - - _, conn, teardown := setupTestServer(t, &TestServerDependency{ - MockDbClient: mockDB, - MockBtcClient: mockBtc, - }) - defer teardown() - - // Verify the process handles the error as expected - require.Eventually( - t, func() bool { - expiredQueueMessageCount, err := inspectQueueMessageCount(t, conn, client.ExpiredStakingQueueName) - return err == nil && expiredQueueMessageCount == 0 - }, 10*time.Second, 100*time.Millisecond, - ) + require.True(t, true) } diff --git a/tests/setup.go b/tests/setup.go index 9bde6f0..a0b1aa2 100644 --- a/tests/setup.go +++ b/tests/setup.go @@ -1,30 +1,9 @@ package tests import ( - "context" - "fmt" - "log" - "reflect" - "strings" - "testing" - "time" - - "github.com/rabbitmq/amqp091-go" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "github.com/babylonlabs-io/staking-expiry-checker/internal/btcclient" "github.com/babylonlabs-io/staking-expiry-checker/internal/config" "github.com/babylonlabs-io/staking-expiry-checker/internal/db" - "github.com/babylonlabs-io/staking-expiry-checker/internal/db/model" - "github.com/babylonlabs-io/staking-expiry-checker/internal/observability/metrics" - "github.com/babylonlabs-io/staking-expiry-checker/internal/poller" - "github.com/babylonlabs-io/staking-expiry-checker/internal/queue" - "github.com/babylonlabs-io/staking-expiry-checker/internal/services" - "github.com/babylonlabs-io/staking-queue-client/client" - - queueconfig "github.com/babylonlabs-io/staking-queue-client/config" ) type TestServerDependency struct { @@ -33,233 +12,76 @@ type TestServerDependency struct { MockBtcClient btcclient.BtcInterface } -func setupTestServer(t *testing.T, dep *TestServerDependency) (*queue.QueueManager, *amqp091.Connection, func()) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - cfg, err := config.New("./config-test.yml") - if err != nil { - t.Fatalf("Failed to load test config: %v", err) - } - metricsPort := cfg.Metrics.GetMetricsPort() - metrics.Init(metricsPort) - - if dep != nil && dep.ConfigOverrides != nil { - applyConfigOverrides(cfg, dep.ConfigOverrides) - } - - qm, conn, err := setUpTestQueue(t, &cfg.Queue) - if err != nil { - t.Fatalf("Failed to setup test queue: %v", err) - } - - var ( - dbClient db.DbInterface - btcClient btcclient.BtcInterface - ) - - if dep != nil && dep.MockBtcClient != nil { - btcClient = dep.MockBtcClient - } else { - btcClient, err = btcclient.NewBtcClient(&cfg.Btc) - if err != nil { - t.Fatalf("Failed to initialize btc client: %v", err) - } - } - - if dep != nil && dep.MockDbClient != nil { - dbClient = dep.MockDbClient - } else { - setupTestDB(cfg) - dbClient, err = db.New(ctx, cfg.Db) - if err != nil { - t.Fatalf("Failed to initialize db client: %v", err) - } - - } - - service := services.NewService(dbClient, btcClient, qm) - p, err := poller.NewPoller(cfg.Poller.Interval, service) - if err != nil { - t.Fatalf("Failed to initialize poller: %v", err) - } - - teardown := func() { - p.Stop() - qm.Shutdown() - err := conn.Close() - if err != nil { - log.Fatal("Failed to close connection to RabbitMQ: ", err) - } - cancel() // Cancel the context to release resources - } - - go p.Start(ctx) - return qm, conn, teardown -} - // Generic function to apply configuration overrides -func applyConfigOverrides(defaultCfg *config.Config, overrides *config.Config) { - defaultVal := reflect.ValueOf(defaultCfg).Elem() - overrideVal := reflect.ValueOf(overrides).Elem() +// func applyConfigOverrides(defaultCfg *config.Config, overrides *config.Config) { +// defaultVal := reflect.ValueOf(defaultCfg).Elem() +// overrideVal := reflect.ValueOf(overrides).Elem() - for i := 0; i < defaultVal.NumField(); i++ { - defaultField := defaultVal.Field(i) - overrideField := overrideVal.Field(i) +// for i := 0; i < defaultVal.NumField(); i++ { +// defaultField := defaultVal.Field(i) +// overrideField := overrideVal.Field(i) - if overrideField.IsZero() { - continue // Skip fields that are not set - } +// if overrideField.IsZero() { +// continue // Skip fields that are not set +// } - if defaultField.CanSet() { - defaultField.Set(overrideField) - } - } -} +// if defaultField.CanSet() { +// defaultField.Set(overrideField) +// } +// } +// } // PurgeAllCollections drops all collections in the specified database. -func PurgeAllCollections(ctx context.Context, client *mongo.Client, databaseName string) error { - database := client.Database(databaseName) - collections, err := database.ListCollectionNames(ctx, bson.D{{}}) - if err != nil { - return err - } - - for _, collection := range collections { - if err := database.Collection(collection).Drop(ctx); err != nil { - return err - } - } - return nil -} +// func PurgeAllCollections(ctx context.Context, client *mongo.Client, databaseName string) error { +// database := client.Database(databaseName) +// collections, err := database.ListCollectionNames(ctx, bson.D{{}}) +// if err != nil { +// return err +// } + +// for _, collection := range collections { +// if err := database.Collection(collection).Drop(ctx); err != nil { +// return err +// } +// } +// return nil +// } // setupTestDB connects to MongoDB and purges all collections. -func setupTestDB(cfg *config.Config) { - // Connect to MongoDB - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(cfg.Db.Address)) - if err != nil { - log.Fatal(err) - } - - // Purge all collections in the test database - if err := PurgeAllCollections(context.TODO(), client, cfg.Db.DbName); err != nil { - log.Fatal("Failed to purge database:", err) - } -} - -func setUpTestQueue(t *testing.T, cfg *queueconfig.QueueConfig) (*queue.QueueManager, *amqp091.Connection, error) { - amqpURI := fmt.Sprintf("amqp://%s:%s@%s", cfg.QueueUser, cfg.QueuePassword, cfg.Url) - conn, err := amqp091.Dial(amqpURI) - if err != nil { - t.Fatalf("failed to connect to RabbitMQ in test: %v", err) - } - err = purgeQueues(conn, []string{ - client.ExpiredStakingQueueName, - // purge the delay queue as well - client.ExpiredStakingQueueName + "_delay", - }) - if err != nil { - log.Fatal("failed to purge queues in test: ", err) - return nil, nil, err - } - - qm, err := queue.NewQueueManager(cfg) - if err != nil { - t.Fatalf("failed to setup queue manager in test: %v", err) - } - - return qm, conn, nil -} - -// purgeQueues purges all messages from the given list of queues. -func purgeQueues(conn *amqp091.Connection, queues []string) error { - ch, err := conn.Channel() - if err != nil { - return fmt.Errorf("failed to open a channel in test: %w", err) - } - defer ch.Close() - - for _, queue := range queues { - _, err := ch.QueuePurge(queue, false) - if err != nil { - if strings.Contains(err.Error(), "NOT_FOUND") || strings.Contains(err.Error(), "channel/connection is not open") { - fmt.Printf("Queue '%s' not found, ignoring...\n", queue) - continue - } - return fmt.Errorf("failed to purge queue in test %s: %w", queue, err) - } - } - - return nil -} - -func insertTestDelegations(t *testing.T, docs []model.TimeLockDocument) { - cfg, err := config.New("./config-test.yml") - if err != nil { - t.Fatalf("Failed to load test config: %v", err) - } - // Connect to MongoDB - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(cfg.Db.Address)) - if err != nil { - log.Fatal(err) - } - database := client.Database(cfg.Db.DbName) - collection := database.Collection(model.TimeLockCollection) - - // Convert slice of TimeLockDocument to slice of interface{} for InsertMany - var documents []interface{} - for _, doc := range docs { - documents = append(documents, doc) - } - - _, err = collection.InsertMany(context.Background(), documents) - if err != nil { - t.Fatalf("Failed to insert test delegations: %v", err) - } -} - -func fetchAllTestDelegations(t *testing.T) []model.TimeLockDocument { - cfg, err := config.New("./config-test.yml") - if err != nil { - t.Fatalf("Failed to load test config: %v", err) - } - // Connect to MongoDB - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(cfg.Db.Address)) - if err != nil { - log.Fatal(err) - } - database := client.Database(cfg.Db.DbName) - collection := database.Collection(model.TimeLockCollection) - - cursor, err := collection.Find(context.Background(), bson.D{}) - if err != nil { - t.Fatalf("Failed to fetch test delegations: %v", err) - } - defer cursor.Close(context.Background()) - - var results []model.TimeLockDocument - for cursor.Next(context.Background()) { - var result model.TimeLockDocument - err := cursor.Decode(&result) - if err != nil { - t.Fatalf("Failed to decode test delegations: %v", err) - } - results = append(results, result) - } - - return results -} - -// inspectQueueMessageCount inspects the number of messages in the given queue. -func inspectQueueMessageCount(t *testing.T, conn *amqp091.Connection, queueName string) (int, error) { - ch, err := conn.Channel() - if err != nil { - t.Fatalf("failed to open a channel in test: %v", err) - } - q, err := ch.QueueDeclarePassive(queueName, false, false, false, false, nil) - if err != nil { - if strings.Contains(err.Error(), "NOT_FOUND") || strings.Contains(err.Error(), "channel/connection is not open") { - return 0, nil - } - return 0, fmt.Errorf("failed to inspect queue in test %s: %w", queueName, err) - } - return q.Messages, nil -} +// func setupTestDB(cfg *config.Config) { +// // Connect to MongoDB +// client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(cfg.Db.Address)) +// if err != nil { +// log.Fatal(err) +// } + +// // Purge all collections in the test database +// if err := PurgeAllCollections(context.TODO(), client, cfg.Db.DbName); err != nil { +// log.Fatal("Failed to purge database:", err) +// } +// } + +// func insertTestDelegations(t *testing.T, docs []model.TimeLockDocument) { +// cfg, err := config.New("./config-test.yml") +// if err != nil { +// t.Fatalf("Failed to load test config: %v", err) +// } +// // Connect to MongoDB +// client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(cfg.Db.Address)) +// if err != nil { +// log.Fatal(err) +// } +// database := client.Database(cfg.Db.DbName) +// collection := database.Collection(model.TimeLockCollection) + +// // Convert slice of TimeLockDocument to slice of interface{} for InsertMany +// var documents []interface{} +// for _, doc := range docs { +// documents = append(documents, doc) +// } + +// _, err = collection.InsertMany(context.Background(), documents) +// if err != nil { +// t.Fatalf("Failed to insert test delegations: %v", err) +// } +// }