From e25c0c0ab9165a8e5416bc090a897c1d00362fc1 Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Fri, 26 Apr 2024 09:40:21 +0800 Subject: [PATCH 1/2] feat: calculate the stats via stats queue --- bin/init-mongo.sh | 1 + go.mod | 2 +- go.sum | 4 +- internal/api/handlers/finality_provider.go | 5 +- internal/db/delegation.go | 2 +- internal/db/interface.go | 2 +- internal/db/model/delegation.go | 26 +---- internal/db/model/stats.go | 57 +++++------ internal/db/stats.go | 105 ++++++--------------- internal/queue/handlers/active_staking.go | 12 +-- internal/queue/handlers/expired_staking.go | 16 ++-- internal/queue/handlers/handler.go | 23 ++++- internal/queue/handlers/stats.go | 39 ++++++++ internal/queue/queue.go | 20 +++- internal/services/finality_provider.go | 99 ++++++++++++------- internal/types/delegation.go | 19 ++++ tests/finality_provider_test.go | 33 ++++++- tests/mocks/mock_db_client.go | 24 ++--- tests/setup.go | 2 + tests/stats_test.go | 17 ++-- 20 files changed, 293 insertions(+), 215 deletions(-) create mode 100644 internal/queue/handlers/stats.go diff --git a/bin/init-mongo.sh b/bin/init-mongo.sh index 416381a..aa5594c 100755 --- a/bin/init-mongo.sh +++ b/bin/init-mongo.sh @@ -19,6 +19,7 @@ db.unbonding_queue.createIndex({'unbonding_tx_hash_hex': 1}, {unique: true}); db.timelock_queue.createIndex({'expire_height': 1}, {unique: false}); db.delegations.createIndex({'staker_pk_hex': 1, 'staking_tx.start_height': -1}, {unique: false}); db.staker_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false}); +db.finality_providers_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false}); " # Keep the container running diff --git a/go.mod b/go.mod index bbad692..1509fc3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21.6 require ( github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236 - github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27 + github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9 github.com/btcsuite/btcd v0.24.0 github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/btcsuite/btcd/btcutil v1.1.5 diff --git a/go.sum b/go.sum index 6b4d1d6..7089c5b 100644 --- a/go.sum +++ b/go.sum @@ -89,8 +89,8 @@ github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8 github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236 h1:Ydna4VcP56xu1+zdgygqHdSCeMduZjuznVhr4exO5do= github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236/go.mod h1:lfeASLNJgcUsX7LEns3HRUv0k+MjzcB2q2AMasfz38M= -github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27 h1:qVf88e5VrG4Rq1AZm188h3alXwB4d0YYs0eyHo+9BWw= -github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27/go.mod h1:mEgA6N3SnwFwGEOsUYr/HdjpKa13Wck08MS7VY/Icvo= +github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9 h1:O5TwaV7Jqsuzre6z5U+q/3iKnfk6f49euq2QzwcAHLY= +github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9/go.mod h1:mEgA6N3SnwFwGEOsUYr/HdjpKa13Wck08MS7VY/Icvo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/internal/api/handlers/finality_provider.go b/internal/api/handlers/finality_provider.go index 0c70742..2de8f8a 100644 --- a/internal/api/handlers/finality_provider.go +++ b/internal/api/handlers/finality_provider.go @@ -13,10 +13,9 @@ import ( // @Success 200 {object} PublicResponse[[]services.FpDetailsPublic] "A list of finality providers sorted by ActiveTvl in descending order" // @Router /v1/finality-providers [get] func (h *Handler) GetFinalityProviders(request *http.Request) (*Result, *types.Error) { - fps, err := h.services.GetActiveFinalityProviders(request.Context()) + fps, paginationToken, err := h.services.GetFinalityProviders(request.Context(), "") if err != nil { return nil, err } - - return NewResult(fps), nil + return NewResultWithPagination(fps, paginationToken), nil } diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 8859cf1..ed07a9e 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -59,7 +59,7 @@ func (db *Database) FindDelegationsByStakerPk(ctx context.Context, stakerPk stri options.SetLimit(db.cfg.MaxPaginationLimit) // Decode the pagination token first if it exist if paginationToken != "" { - decodedToken, err := model.DecodeDelegationByStakerPaginationToken(paginationToken) + decodedToken, err := model.DecodePaginationToken[model.DelegationByStakerPagination](paginationToken) if err != nil { return nil, &InvalidPaginationTokenError{ Message: "Invalid pagination token", diff --git a/internal/db/interface.go b/internal/db/interface.go index c960688..fceb415 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -45,7 +45,7 @@ type DBClient interface { SubtractFinalityProviderStats( ctx context.Context, stakingTxHashHex, fpPkHex string, amount uint64, ) error - FindFinalityProviderStatsByPkHex(ctx context.Context, pkHex []string) (map[string]model.FinalityProviderStatsDocument, error) + FindFinalityProviderStats(ctx context.Context, paginationToken string) (*DbResultMap[model.FinalityProviderStatsDocument], error) IncrementStakerStats( ctx context.Context, stakingTxHashHex, stakerPkHex string, amount uint64, ) error diff --git a/internal/db/model/delegation.go b/internal/db/model/delegation.go index d581389..2b180e3 100644 --- a/internal/db/model/delegation.go +++ b/internal/db/model/delegation.go @@ -1,9 +1,6 @@ package model import ( - "encoding/base64" - "encoding/json" - "github.com/babylonchain/staking-api-service/internal/types" ) @@ -32,33 +29,12 @@ type DelegationByStakerPagination struct { StakingStartHeight uint64 `json:"staking_start_height"` } -func DecodeDelegationByStakerPaginationToken(token string) (*DelegationByStakerPagination, error) { - tokenBytes, err := base64.URLEncoding.DecodeString(token) - if err != nil { - return nil, err - } - var d DelegationByStakerPagination - err = json.Unmarshal(tokenBytes, &d) - if err != nil { - return nil, err - } - return &d, nil -} - -func (d *DelegationByStakerPagination) GetPaginationToken() (string, error) { - tokenBytes, err := json.Marshal(d) - if err != nil { - return "", err - } - return base64.URLEncoding.EncodeToString(tokenBytes), nil -} - func BuildDelegationByStakerPaginationToken(d DelegationDocument) (string, error) { page := &DelegationByStakerPagination{ StakingTxHashHex: d.StakingTxHashHex, StakingStartHeight: d.StakingTx.StartHeight, } - token, err := page.GetPaginationToken() + token, err := GetPaginationToken(page) if err != nil { return "", err } diff --git a/internal/db/model/stats.go b/internal/db/model/stats.go index 47bb7b2..1ccf95d 100644 --- a/internal/db/model/stats.go +++ b/internal/db/model/stats.go @@ -1,10 +1,5 @@ package model -import ( - "encoding/base64" - "encoding/json" -) - const ( StatsLockCollection = "stats_lock" OverallStatsCollection = "overall_stats" @@ -44,11 +39,28 @@ type OverallStatsDocument struct { } type FinalityProviderStatsDocument struct { - Id string `bson:"_id"` // FinalityProviderPkHex:shard-number - ActiveTvl int64 `bson:"active_tvl"` - TotalTvl int64 `bson:"total_tvl"` - ActiveDelegations int64 `bson:"active_delegations"` - TotalDelegations int64 `bson:"total_delegations"` + FinalityProviderPkHex string `bson:"_id"` // FinalityProviderPkHex + ActiveTvl int64 `bson:"active_tvl"` + TotalTvl int64 `bson:"total_tvl"` + ActiveDelegations int64 `bson:"active_delegations"` + TotalDelegations int64 `bson:"total_delegations"` +} + +type FinalityProviderStatsPagination struct { + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + ActiveTvl int64 `json:"active_tvl"` +} + +func BuildFinalityProviderStatsPaginationToken(d FinalityProviderStatsDocument) (string, error) { + page := FinalityProviderStatsPagination{ + ActiveTvl: d.ActiveTvl, + FinalityProviderPkHex: d.FinalityProviderPkHex, + } + token, err := GetPaginationToken(page) + if err != nil { + return "", err + } + return token, nil } type StakerStatsDocument struct { @@ -66,33 +78,12 @@ type StakerStatsByStakerPagination struct { ActiveTvl int64 `json:"active_tvl"` } -func DecodeStakerStatsByStakerPaginationToken(token string) (*StakerStatsByStakerPagination, error) { - tokenBytes, err := base64.URLEncoding.DecodeString(token) - if err != nil { - return nil, err - } - var d StakerStatsByStakerPagination - err = json.Unmarshal(tokenBytes, &d) - if err != nil { - return nil, err - } - return &d, nil -} - -func (d *StakerStatsByStakerPagination) GetPaginationToken() (string, error) { - tokenBytes, err := json.Marshal(d) - if err != nil { - return "", err - } - return base64.URLEncoding.EncodeToString(tokenBytes), nil -} - func BuildStakerStatsByStakerPaginationToken(d StakerStatsDocument) (string, error) { - page := &StakerStatsByStakerPagination{ + page := StakerStatsByStakerPagination{ StakerPkHex: d.StakerPkHex, ActiveTvl: d.ActiveTvl, } - token, err := page.GetPaginationToken() + token, err := GetPaginationToken(page) if err != nil { return "", err } diff --git a/internal/db/stats.go b/internal/db/stats.go index ff99e78..a419e01 100644 --- a/internal/db/stats.go +++ b/internal/db/stats.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "strings" "github.com/babylonchain/staking-api-service/internal/db/model" "github.com/babylonchain/staking-api-service/internal/types" @@ -247,60 +246,41 @@ func (db *Database) SubtractFinalityProviderStats( return db.updateFinalityProviderStats(ctx, types.Unbonded.ToString(), stakingTxHashHex, fpPkHex, upsertUpdate) } -// FindFinalityProviderStatsByPkHex finds the finality provider stats for the given finality provider pk hex -// This method queries all the shards and sums up the stats -// Refer to the README.md in this directory for more information on the sharding logic -func (db *Database) FindFinalityProviderStatsByPkHex(ctx context.Context, pkHex []string) (map[string]model.FinalityProviderStatsDocument, error) { +// FindFinalityProviderStats fetches the finality provider stats from the database +func (db *Database) FindFinalityProviderStats(ctx context.Context, paginationToken string) (*DbResultMap[model.FinalityProviderStatsDocument], error) { client := db.Client.Database(db.DbName).Collection(model.FinalityProviderStatsCollection) - finalityProvidersMap := make(map[string]model.FinalityProviderStatsDocument) - - batchSize := int(db.cfg.DbBatchSizeLimit) - for i := 0; i < len(pkHex); i += batchSize { - end := i + batchSize - if end > len(pkHex) { - end = len(pkHex) - } - batch := pkHex[i:end] + options := options.Find().SetSort(bson.D{{Key: "active_tvl", Value: -1}}) // Sorting in descending order + options.SetLimit(db.cfg.MaxPaginationLimit) + var filter bson.M - filter := bson.M{"_id": bson.M{"$in": db.getAllShardedFinalityProviderId(batch)}} - cursor, err := client.Find(ctx, filter) + // Decode the pagination token first if it exist + if paginationToken != "" { + decodedToken, err := model.DecodePaginationToken[model.FinalityProviderStatsPagination](paginationToken) if err != nil { - return nil, err + return nil, &InvalidPaginationTokenError{ + Message: "Invalid pagination token", + } } - - var shardedFinalityProvidersStats []model.FinalityProviderStatsDocument - if err = cursor.All(ctx, &shardedFinalityProvidersStats); err != nil { - cursor.Close(ctx) - return nil, err + filter = bson.M{ + "$or": []bson.M{ + {"active_tvl": bson.M{"$lt": decodedToken.ActiveTvl}}, + {"active_tvl": decodedToken.ActiveTvl, "_id": bson.M{"$lt": decodedToken.FinalityProviderPkHex}}, + }, } - cursor.Close(ctx) + } - // Sum up the stats for the finality provider - for _, fp := range shardedFinalityProvidersStats { - // Retrieve the finality provider pk hex from the id. - fpPkHex, err := extractFinalityProviderPkHexFromStatsId(fp.Id) - if err != nil { - return nil, err - } - if existingFp, ok := finalityProvidersMap[fpPkHex]; ok { - existingFp.ActiveTvl += fp.ActiveTvl - existingFp.TotalTvl += fp.TotalTvl - existingFp.ActiveDelegations += fp.ActiveDelegations - existingFp.TotalDelegations += fp.TotalDelegations - - finalityProvidersMap[fpPkHex] = existingFp - } else { - finalityProvidersMap[fpPkHex] = model.FinalityProviderStatsDocument{ - ActiveTvl: fp.ActiveTvl, - TotalTvl: fp.TotalTvl, - ActiveDelegations: fp.ActiveDelegations, - TotalDelegations: fp.TotalDelegations, - } - } - } + cursor, err := client.Find(ctx, filter, options) + if err != nil { + return nil, err } + defer cursor.Close(ctx) - return finalityProvidersMap, nil + var delegations []model.FinalityProviderStatsDocument + if err = cursor.All(ctx, &delegations); err != nil { + return nil, err + } + + return toResultMapWithPaginationToken(db.cfg, delegations, model.BuildFinalityProviderStatsPaginationToken) } func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stakingTxHashHex, fpPkHex string, upsertUpdate primitive.M) error { @@ -319,7 +299,7 @@ func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stak return nil, err } - upsertFilter := bson.M{"_id": db.generateFinalityProviderStatsId(fpPkHex)} + upsertFilter := bson.M{"_id": fpPkHex} _, err = client.UpdateOne(sessCtx, upsertFilter, upsertUpdate, options.Update().SetUpsert(true)) if err != nil { @@ -337,33 +317,6 @@ func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stak return nil } -// Genrate the id for the finality provider stats document. -// Id is a combination of finality provider pk hex and a random number ranged from 0-LogicalShardCount -// This is designed to avoid locking the same field during concurrent writes -func (db *Database) generateFinalityProviderStatsId(finalityProviderPkHex string) string { - randomShardNum := uint64(rand.Intn(int(db.cfg.LogicalShardCount))) - return fmt.Sprintf("%s:%d", finalityProviderPkHex, randomShardNum) -} - -func extractFinalityProviderPkHexFromStatsId(id string) (string, error) { - parts := strings.Split(id, ":") - if len(parts) != 2 { - return "", fmt.Errorf("invalid id format: %s", id) - } - return parts[0], nil -} - -// Get the finality provider stats document id for all the shards -func (db *Database) getAllShardedFinalityProviderId(finalityProviderPkHex []string) []string { - var ids []string - for _, fpPkHex := range finalityProviderPkHex { - for i := 0; i < int(db.cfg.LogicalShardCount); i++ { - ids = append(ids, fmt.Sprintf("%s:%d", fpPkHex, i)) - } - } - return ids -} - // IncrementStakerStats increments the staker stats for the given staking tx hash // This method is idempotent, only the first call will be processed. Otherwise it will return a notFoundError for duplicates func (db *Database) IncrementStakerStats( @@ -432,7 +385,7 @@ func (db *Database) FindTopStakersByTvl(ctx context.Context, paginationToken str var filter bson.M // Decode the pagination token first if it exist if paginationToken != "" { - decodedToken, err := model.DecodeStakerStatsByStakerPaginationToken(paginationToken) + decodedToken, err := model.DecodePaginationToken[model.StakerStatsByStakerPagination](paginationToken) if err != nil { return nil, &InvalidPaginationTokenError{ Message: "Invalid pagination token", diff --git a/internal/queue/handlers/active_staking.go b/internal/queue/handlers/active_staking.go index 02b0ebc..005bff3 100644 --- a/internal/queue/handlers/active_staking.go +++ b/internal/queue/handlers/active_staking.go @@ -31,16 +31,16 @@ func (h *QueueHandler) ActiveStakingHandler(ctx context.Context, messageBody str return nil } - // Perform the async stats calculation - statsError := h.Services.ProcessStakingStatsCalculation( - ctx, activeStakingEvent.StakingTxHashHex, + // Perform the async stats calculation by emit the stats event + statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent( + activeStakingEvent.StakingTxHashHex, activeStakingEvent.StakerPkHex, activeStakingEvent.FinalityProviderPkHex, - types.Active, activeStakingEvent.StakingValue, - ) + types.Active.ToString(), + )) if statsError != nil { - log.Ctx(ctx).Error().Err(statsError).Msg("Failed to process staking stats calculation for active staking") + log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event for active staking") return statsError } diff --git a/internal/queue/handlers/expired_staking.go b/internal/queue/handlers/expired_staking.go index 0a7f53c..01de557 100644 --- a/internal/queue/handlers/expired_staking.go +++ b/internal/queue/handlers/expired_staking.go @@ -36,17 +36,17 @@ func (h *QueueHandler) ExpiredStakingHandler(ctx context.Context, messageBody st return err } - // Perform the stats calculation - statsErr := h.Services.ProcessStakingStatsCalculation( - ctx, expiredStakingEvent.StakingTxHashHex, + // Perform the async stats calculation by emit the stats event + statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent( + expiredStakingEvent.StakingTxHashHex, del.StakerPkHex, del.FinalityProviderPkHex, - types.Unbonded, del.StakingValue, - ) - if statsErr != nil { - log.Error().Err(statsErr).Msg("Failed to process staking stats calculation after timelock expired") - return statsErr + types.Unbonded.ToString(), + )) + if statsError != nil { + log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event after timelock expired") + return statsError } transitionErr := h.Services.TransitionToUnbondedState(ctx, txType, expiredStakingEvent.StakingTxHashHex) diff --git a/internal/queue/handlers/handler.go b/internal/queue/handlers/handler.go index 66394f2..6495643 100644 --- a/internal/queue/handlers/handler.go +++ b/internal/queue/handlers/handler.go @@ -2,23 +2,40 @@ package handlers import ( "context" + "encoding/json" "github.com/babylonchain/staking-api-service/internal/services" + "github.com/babylonchain/staking-queue-client/client" + "github.com/rs/zerolog/log" ) type QueueHandler struct { - Services *services.Services + Services *services.Services + emitStatsEvent func(ctx context.Context, messageBody string) error } type MessageHandler func(ctx context.Context, messageBody string) error type UnprocessableMessageHandler func(ctx context.Context, messageBody, receipt string) error -func NewQueueHandler(services *services.Services) *QueueHandler { +func NewQueueHandler( + services *services.Services, + emitStats func(ctx context.Context, messageBody string) error, +) *QueueHandler { return &QueueHandler{ - Services: services, + Services: services, + emitStatsEvent: emitStats, } } func (qh *QueueHandler) HandleUnprocessedMessage(ctx context.Context, messageBody, receipt string) error { return qh.Services.SaveUnprocessableMessages(ctx, messageBody, receipt) } + +func (qh *QueueHandler) EmitStatsEvent(ctx context.Context, statsEvent client.StatsEvent) error { + jsonData, err := json.Marshal(statsEvent) + if err != nil { + log.Ctx(ctx).Err(err).Msg("Failed to marshal the stats event") + return err + } + return qh.emitStatsEvent(ctx, string(jsonData)) +} diff --git a/internal/queue/handlers/stats.go b/internal/queue/handlers/stats.go new file mode 100644 index 0000000..0018d39 --- /dev/null +++ b/internal/queue/handlers/stats.go @@ -0,0 +1,39 @@ +package handlers + +import ( + "context" + "encoding/json" + + "github.com/babylonchain/staking-api-service/internal/types" + queueClient "github.com/babylonchain/staking-queue-client/client" + "github.com/rs/zerolog/log" +) + +func (h *QueueHandler) StatsHandler(ctx context.Context, messageBody string) error { + var statsEvent queueClient.StatsEvent + err := json.Unmarshal([]byte(messageBody), &statsEvent) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal the message body into statsEvent") + return err + } + + state, err := types.FromStringToDelegationState(statsEvent.State) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("Failed to convert statsEvent.State to DelegationState") + return err + } + + // Perform the stats calculation + statsErr := h.Services.ProcessStakingStatsCalculation( + ctx, statsEvent.StakingTxHashHex, + statsEvent.StakerPkHex, + statsEvent.FinalityProviderPkHex, + state, + statsEvent.StakingValue, + ) + if statsErr != nil { + log.Error().Err(statsErr).Msg("Failed to process staking stats calculation") + return statsErr + } + return nil +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 8d2f745..fe82f08 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -20,6 +20,7 @@ type Queues struct { ExpiredStakingQueueClient client.QueueClient UnbondingStakingQueueClient client.QueueClient WithdrawStakingQueueClient client.QueueClient + StatsQueueClient client.QueueClient } func New(cfg *queueConfig.QueueConfig, service *services.Services) *Queues { @@ -51,7 +52,14 @@ func New(cfg *queueConfig.QueueConfig, service *services.Services) *Queues { log.Fatal().Err(err).Msg("error while creating WithdrawStakingQueueClient") } - handlers := handlers.NewQueueHandler(service) + statsQueueClient, err := client.NewQueueClient( + cfg, client.StakingStatsQueueName, + ) + if err != nil { + log.Fatal().Err(err).Msg("error while creating StatsQueueClient") + } + + handlers := handlers.NewQueueHandler(service, statsQueueClient.SendMessage) return &Queues{ Handlers: handlers, processingTimeout: time.Duration(cfg.QueueProcessingTimeout) * time.Second, @@ -60,6 +68,7 @@ func New(cfg *queueConfig.QueueConfig, service *services.Services) *Queues { ExpiredStakingQueueClient: expiredStakingQueueClient, UnbondingStakingQueueClient: unbondingStakingQueueClient, WithdrawStakingQueueClient: withdrawStakingQueueClient, + StatsQueueClient: statsQueueClient, } } @@ -86,6 +95,11 @@ func (q *Queues) StartReceivingMessages() { q.Handlers.WithdrawStakingHandler, q.Handlers.HandleUnprocessedMessage, q.maxRetryAttempts, q.processingTimeout, ) + startQueueMessageProcessing( + q.StatsQueueClient, + q.Handlers.StatsHandler, q.Handlers.HandleUnprocessedMessage, + q.maxRetryAttempts, q.processingTimeout, + ) // ...add more queues here } @@ -107,6 +121,10 @@ func (q *Queues) StopReceivingMessages() { if withdrawnQueueErr != nil { log.Error().Err(withdrawnQueueErr).Str("queueName", q.WithdrawStakingQueueClient.GetQueueName()).Msg("error while stopping queue") } + statsQueueErr := q.StatsQueueClient.Stop() + if statsQueueErr != nil { + log.Error().Err(statsQueueErr).Str("queueName", q.StatsQueueClient.GetQueueName()).Msg("error while stopping queue") + } // ...add more queues here } diff --git a/internal/services/finality_provider.go b/internal/services/finality_provider.go index 970fb17..d1d00fa 100644 --- a/internal/services/finality_provider.go +++ b/internal/services/finality_provider.go @@ -3,8 +3,8 @@ package services import ( "context" "net/http" - "sort" + "github.com/babylonchain/staking-api-service/internal/db" "github.com/babylonchain/staking-api-service/internal/types" "github.com/rs/zerolog/log" ) @@ -17,43 +17,89 @@ type FpDescriptionPublic struct { Details string `json:"details"` } +func EmptyFpDescriptionPublic() FpDescriptionPublic { + return FpDescriptionPublic{ + Moniker: "", + Identity: "", + Website: "", + SecurityContact: "", + Details: "", + } +} + type FpDetailsPublic struct { - Description FpDescriptionPublic `json:"description"` - Commission string `json:"commission"` - BtcPk string `json:"btc_pk"` - ActiveTvl int64 `json:"active_tvl"` - TotalTvl int64 `json:"total_tvl"` - ActiveDelegations int64 `json:"active_delegations"` - TotalDelegations int64 `json:"total_delegations"` + Description *FpDescriptionPublic `json:"description"` + Commission string `json:"commission"` + BtcPk string `json:"btc_pk"` + ActiveTvl int64 `json:"active_tvl"` + TotalTvl int64 `json:"total_tvl"` + ActiveDelegations int64 `json:"active_delegations"` + TotalDelegations int64 `json:"total_delegations"` } -func (s *Services) GetActiveFinalityProviders(ctx context.Context) ([]FpDetailsPublic, *types.Error) { +func (s *Services) GetFinalityProviders(ctx context.Context, page string) ([]FpDetailsPublic, string, *types.Error) { fpParams := s.GetFinalityProvidersFromGlobalParams() if len(fpParams) == 0 { log.Ctx(ctx).Error().Msg("No finality providers found from global params") - return nil, types.NewErrorWithMsg(http.StatusInternalServerError, types.InternalServiceError, "No finality providers found from global params") + return nil, "", types.NewErrorWithMsg(http.StatusInternalServerError, types.InternalServiceError, "No finality providers found from global params") } - - var fpBtcPks []string + // Convert the fpParams slice to a map with the BtcPk as the key + fpParamsMap := make(map[string]*FpParamsPublic) for _, fp := range fpParams { - fpBtcPks = append(fpBtcPks, fp.BtcPk) + fpParamsMap[fp.BtcPk] = &fp } - finalityProviderStatsMap, err := s.DbClient.FindFinalityProviderStatsByPkHex(ctx, fpBtcPks) + resultMap, err := s.DbClient.FindFinalityProviderStats(ctx, page) if err != nil { + if db.IsInvalidPaginationTokenError(err) { + log.Ctx(ctx).Warn().Err(err).Msg("Invalid pagination token when fetching finality providers") + return nil, "", types.NewError(http.StatusBadRequest, types.BadRequest, err) + } // We don't want to return an error here in case of DB error. // we will continue the process with the data we have from global params as a fallback. + // TODO: Add metric for this error and alerting log.Ctx(ctx).Error().Err(err).Msg("Error while fetching finality providers from DB") - // TODO: Metric for this error and alerting + // Return the finality providers from global params as a fallback + return buildFallbackFpDetailsPublic(fpParams), "", nil + } + // If no finality providers are found in the DB, + // return the finality providers from global params as a fallback + if len(resultMap.Data) == 0 { + return buildFallbackFpDetailsPublic(fpParams), "", nil } var finalityProviderDetailsPublic []FpDetailsPublic + for _, fp := range resultMap.Data { + var paramsPublic *FpParamsPublic + if fpParamsMap[fp.FinalityProviderPkHex] != nil { + paramsPublic = fpParamsMap[fp.FinalityProviderPkHex] + } else { + paramsPublic = &FpParamsPublic{ + Description: EmptyFpDescriptionPublic(), + Commission: "", + BtcPk: fp.FinalityProviderPkHex, + } + } + detail := FpDetailsPublic{ + Description: ¶msPublic.Description, + Commission: paramsPublic.Commission, + BtcPk: fp.FinalityProviderPkHex, + ActiveTvl: fp.ActiveTvl, + TotalTvl: fp.TotalTvl, + ActiveDelegations: fp.ActiveDelegations, + TotalDelegations: fp.TotalDelegations, + } + finalityProviderDetailsPublic = append(finalityProviderDetailsPublic, detail) + } + return finalityProviderDetailsPublic, resultMap.PaginationToken, nil +} + +func buildFallbackFpDetailsPublic(fpParams []FpParamsPublic) []FpDetailsPublic { + var finalityProviderDetailsPublic []FpDetailsPublic for _, fp := range fpParams { - // Default values being set for ActiveTvl, TotalTvl, ActiveDelegations, TotalDelegations - // This could happen if our system has never processed any staking tx events associated to this finality provider detail := FpDetailsPublic{ - Description: fp.Description, + Description: &fp.Description, Commission: fp.Commission, BtcPk: fp.BtcPk, ActiveTvl: 0, @@ -61,22 +107,7 @@ func (s *Services) GetActiveFinalityProviders(ctx context.Context) ([]FpDetailsP ActiveDelegations: 0, TotalDelegations: 0, } - - if finalityProvider, ok := finalityProviderStatsMap[fp.BtcPk]; ok { - detail.ActiveTvl = finalityProvider.ActiveTvl - detail.TotalTvl = finalityProvider.TotalTvl - detail.ActiveDelegations = finalityProvider.ActiveDelegations - detail.TotalDelegations = finalityProvider.TotalDelegations - } else if !ok { - log.Ctx(ctx).Warn().Str("btc_pk", fp.BtcPk).Msg("Finality provider not found in DB") - } finalityProviderDetailsPublic = append(finalityProviderDetailsPublic, detail) } - - // Sort the finalityProviderDetailsPublic slice by ActiveTvl in descending order - sort.SliceStable(finalityProviderDetailsPublic, func(i, j int) bool { - return finalityProviderDetailsPublic[i].ActiveTvl > finalityProviderDetailsPublic[j].ActiveTvl - }) - - return finalityProviderDetailsPublic, nil + return finalityProviderDetailsPublic } diff --git a/internal/types/delegation.go b/internal/types/delegation.go index 1542a75..a19e611 100644 --- a/internal/types/delegation.go +++ b/internal/types/delegation.go @@ -1,5 +1,7 @@ package types +import "fmt" + type DelegationState string const ( @@ -13,3 +15,20 @@ const ( func (s DelegationState) ToString() string { return string(s) } + +func FromStringToDelegationState(s string) (DelegationState, error) { + switch s { + case "active": + return Active, nil + case "unbonding_requested": + return UnbondingRequested, nil + case "unbonding": + return Unbonding, nil + case "unbonded": + return Unbonded, nil + case "withdrawn": + return Withdrawn, nil + default: + return "", fmt.Errorf("invalid delegation state: %s", s) + } +} diff --git a/tests/finality_provider_test.go b/tests/finality_provider_test.go index 9f1226b..edf29eb 100644 --- a/tests/finality_provider_test.go +++ b/tests/finality_provider_test.go @@ -8,6 +8,8 @@ import ( "testing" "github.com/babylonchain/staking-api-service/internal/api/handlers" + "github.com/babylonchain/staking-api-service/internal/db" + "github.com/babylonchain/staking-api-service/internal/db/model" "github.com/babylonchain/staking-api-service/internal/services" testmock "github.com/babylonchain/staking-api-service/tests/mocks" "github.com/stretchr/testify/assert" @@ -41,7 +43,7 @@ func shouldGetFinalityProvidersSuccessfully(t *testing.T, testServer *TestServer // Check that the response body is as expected assert.NotEmpty(t, result, "expected response body to be non-empty") - assert.Equal(t, "Babylon Foundation 2", result[2].Description.Moniker) + assert.Equal(t, "Babylon Foundation 3", result[2].Description.Moniker) assert.Equal(t, "0.060000000000000000", result[1].Commission) assert.Equal(t, "0d2f9728abc45c0cdeefdd73f52a0e0102470e35fb689fc5bc681959a61b021f", result[3].BtcPk) @@ -61,8 +63,35 @@ func TestGetFinalityProvidersSuccessfully(t *testing.T) { func TestGetFinalityProviderShouldNotFailInCaseOfDbFailure(t *testing.T) { mockDB := new(testmock.DBClient) - mockDB.On("FindFinalityProviderStatsByPkHex", mock.Anything, mock.Anything).Return(nil, errors.New("just an error")) + mockDB.On("FindFinalityProviderStats", mock.Anything, mock.Anything).Return(nil, errors.New("just an error")) testServer := setupTestServer(t, &TestServerDependency{MockDbClient: mockDB}) shouldGetFinalityProvidersSuccessfully(t, testServer) } + +func TestGetFinalityProviderShouldReturnFallbackToGlobalParams(t *testing.T) { + mockedResultMap := &db.DbResultMap[model.FinalityProviderStatsDocument]{ + Data: []model.FinalityProviderStatsDocument{}, + PaginationToken: "", + } + mockDB := new(testmock.DBClient) + mockDB.On("FindFinalityProviderStats", mock.Anything, mock.Anything).Return(mockedResultMap, nil) + + testServer := setupTestServer(t, &TestServerDependency{MockDbClient: mockDB}) + shouldGetFinalityProvidersSuccessfully(t, testServer) +} + +func TestGetFinalityProviderReturn4xxErrorIfPageTokenInvalid(t *testing.T) { + mockDB := new(testmock.DBClient) + mockDB.On("FindFinalityProviderStats", mock.Anything, mock.Anything).Return(nil, &db.InvalidPaginationTokenError{}) + + testServer := setupTestServer(t, &TestServerDependency{MockDbClient: mockDB}) + url := testServer.Server.URL + finalityProviderPath + defer testServer.Close() + // Make a GET request to the finality providers endpoint + resp, err := http.Get(url) + assert.NoError(t, err, "making GET request to finality providers endpoint should not fail") + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 9288d1f..b0c7425 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -78,29 +78,29 @@ func (_m *DBClient) FindDelegationsByStakerPk(ctx context.Context, stakerPk stri return r0, r1 } -// FindFinalityProviderStatsByPkHex provides a mock function with given fields: ctx, pkHex -func (_m *DBClient) FindFinalityProviderStatsByPkHex(ctx context.Context, pkHex []string) (map[string]model.FinalityProviderStatsDocument, error) { - ret := _m.Called(ctx, pkHex) +// FindFinalityProviderStats provides a mock function with given fields: ctx, paginationToken +func (_m *DBClient) FindFinalityProviderStats(ctx context.Context, paginationToken string) (*db.DbResultMap[model.FinalityProviderStatsDocument], error) { + ret := _m.Called(ctx, paginationToken) if len(ret) == 0 { - panic("no return value specified for FindFinalityProviderStatsByPkHex") + panic("no return value specified for FindFinalityProviderStats") } - var r0 map[string]model.FinalityProviderStatsDocument + var r0 *db.DbResultMap[model.FinalityProviderStatsDocument] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []string) (map[string]model.FinalityProviderStatsDocument, error)); ok { - return rf(ctx, pkHex) + if rf, ok := ret.Get(0).(func(context.Context, string) (*db.DbResultMap[model.FinalityProviderStatsDocument], error)); ok { + return rf(ctx, paginationToken) } - if rf, ok := ret.Get(0).(func(context.Context, []string) map[string]model.FinalityProviderStatsDocument); ok { - r0 = rf(ctx, pkHex) + if rf, ok := ret.Get(0).(func(context.Context, string) *db.DbResultMap[model.FinalityProviderStatsDocument]); ok { + r0 = rf(ctx, paginationToken) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]model.FinalityProviderStatsDocument) + r0 = ret.Get(0).(*db.DbResultMap[model.FinalityProviderStatsDocument]) } } - if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok { - r1 = rf(ctx, pkHex) + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, paginationToken) } else { r1 = ret.Error(1) } diff --git a/tests/setup.go b/tests/setup.go index 9356b4a..15799fb 100644 --- a/tests/setup.go +++ b/tests/setup.go @@ -177,11 +177,13 @@ func setUpTestQueue(cfg *queueConfig.QueueConfig, service *services.Services) (* client.UnbondingStakingQueueName, client.WithdrawStakingQueueName, client.ExpiredStakingQueueName, + client.StakingStatsQueueName, // purge delay queues as well client.ActiveStakingQueueName + "_delay", client.UnbondingStakingQueueName + "_delay", client.WithdrawStakingQueueName + "_delay", client.ExpiredStakingQueueName + "_delay", + client.StakingStatsQueueName + "_delay", }) if purgeError != nil { log.Fatal("failed to purge queues in test: ", purgeError) diff --git a/tests/stats_test.go b/tests/stats_test.go index 348fc4e..2281e36 100644 --- a/tests/stats_test.go +++ b/tests/stats_test.go @@ -58,13 +58,6 @@ func TestStatsShouldBeShardedInDb(t *testing.T) { assert.Equal(t, int64(0), totalActiveDelegations) assert.NotZero(t, totalTvl) assert.Equal(t, int64(10), totalDelegations) - - // We also check the finality provider stats, make sure it's sharded as well - shardedFinalityProviderStats, err := inspectDbDocuments[model.FinalityProviderStatsDocument](t, model.FinalityProviderStatsCollection) - if err != nil { - t.Fatalf("Failed to inspect DB documents: %v", err) - } - assert.Less(t, 10, len(shardedFinalityProviderStats), "we inserted 10 staking tx, we shall expect more than 10 in db as it's sharded") } func TestStatsCalculationShouldOnlyProcessActiveAndUnbondedEvents(t *testing.T) { @@ -133,6 +126,7 @@ func TestStatsEndpoints(t *testing.T) { // Test the finality endpoint first result := fetchFinalityEndpoint(t, testServer) + assert.Equal(t, 1, len(result)) assert.Equal(t, int64(activeStakingEvent.StakingValue), result[0].ActiveTvl) assert.Equal(t, int64(activeStakingEvent.StakingValue), result[0].TotalTvl) assert.Equal(t, int64(1), result[0].ActiveDelegations) @@ -162,6 +156,7 @@ func TestStatsEndpoints(t *testing.T) { // Make a GET request to the finality providers endpoint result = fetchFinalityEndpoint(t, testServer) + assert.Equal(t, 1, len(result)) assert.Equal(t, int64(0), result[0].ActiveTvl) assert.Equal(t, int64(activeStakingEvent.StakingValue), result[0].TotalTvl) assert.Equal(t, int64(0), result[0].ActiveDelegations) @@ -187,6 +182,14 @@ func TestStatsEndpoints(t *testing.T) { sendTestMessage(testServer.Queues.ActiveStakingQueueClient, activeEvents) time.Sleep(2 * time.Second) + // Make a GET request to the finality providers endpoint + finalityProviderStats := fetchFinalityEndpoint(t, testServer) + assert.Equal(t, 3, len(finalityProviderStats)) + // Make sure sorted by active TVL + for i := 0; i < len(finalityProviderStats)-1; i++ { + assert.True(t, finalityProviderStats[i].ActiveTvl >= finalityProviderStats[i+1].ActiveTvl, "expected response body to be sorted") + } + overallStats = fetchOverallStatsEndpoint(t, testServer) expectedTvl := int64(activeEvents[0].StakingValue + activeEvents[1].StakingValue) From ce43ce8b7c6da010fb6f20b275c69e333ea486b6 Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Thu, 2 May 2024 18:14:13 +0800 Subject: [PATCH 2/2] rename returned variable --- internal/db/stats.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/stats.go b/internal/db/stats.go index a419e01..1a3a05b 100644 --- a/internal/db/stats.go +++ b/internal/db/stats.go @@ -275,12 +275,12 @@ func (db *Database) FindFinalityProviderStats(ctx context.Context, paginationTok } defer cursor.Close(ctx) - var delegations []model.FinalityProviderStatsDocument - if err = cursor.All(ctx, &delegations); err != nil { + var finalityProviders []model.FinalityProviderStatsDocument + if err = cursor.All(ctx, &finalityProviders); err != nil { return nil, err } - return toResultMapWithPaginationToken(db.cfg, delegations, model.BuildFinalityProviderStatsPaginationToken) + return toResultMapWithPaginationToken(db.cfg, finalityProviders, model.BuildFinalityProviderStatsPaginationToken) } func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stakingTxHashHex, fpPkHex string, upsertUpdate primitive.M) error {