Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: calculate the stats via stats queue #76

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/init-mongo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ db.unbonding_queue.createIndex({'unbonding_tx_hash_hex': 1}, {unique: true});
db.timelock_queue.createIndex({'expire_height': 1}, {unique: false});
db.delegations.createIndex({'staker_pk_hex': 1, 'staking_tx.start_height': -1}, {unique: false});
db.staker_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false});
db.finality_providers_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liam-icheng-lai or @filippos47 i will need your help on updating the DB indexes to include all of them from line 18 to 22. I think we already have the index for 18-20. but not for 21 and 22.

"

# Keep the container running
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21.6

require (
github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236
github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27
github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/btcsuite/btcd/btcutil v1.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ github.com/aws/aws-sdk-go v1.44.312/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236 h1:Ydna4VcP56xu1+zdgygqHdSCeMduZjuznVhr4exO5do=
github.com/babylonchain/babylon v0.8.6-0.20240426101001-7778c798e236/go.mod h1:lfeASLNJgcUsX7LEns3HRUv0k+MjzcB2q2AMasfz38M=
github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27 h1:qVf88e5VrG4Rq1AZm188h3alXwB4d0YYs0eyHo+9BWw=
github.com/babylonchain/staking-queue-client v0.0.0-20240424061128-3ac950ef5f27/go.mod h1:mEgA6N3SnwFwGEOsUYr/HdjpKa13Wck08MS7VY/Icvo=
github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9 h1:O5TwaV7Jqsuzre6z5U+q/3iKnfk6f49euq2QzwcAHLY=
github.com/babylonchain/staking-queue-client v0.0.0-20240426081310-59f3af4f7fa9/go.mod h1:mEgA6N3SnwFwGEOsUYr/HdjpKa13Wck08MS7VY/Icvo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
5 changes: 2 additions & 3 deletions internal/api/handlers/finality_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
// @Success 200 {object} PublicResponse[[]services.FpDetailsPublic] "A list of finality providers sorted by ActiveTvl in descending order"
// @Router /v1/finality-providers [get]
func (h *Handler) GetFinalityProviders(request *http.Request) (*Result, *types.Error) {
fps, err := h.services.GetActiveFinalityProviders(request.Context())
fps, paginationToken, err := h.services.GetFinalityProviders(request.Context(), "")
if err != nil {
return nil, err
}

return NewResult(fps), nil
return NewResultWithPagination(fps, paginationToken), nil
}
2 changes: 1 addition & 1 deletion internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (db *Database) FindDelegationsByStakerPk(ctx context.Context, stakerPk stri
options.SetLimit(db.cfg.MaxPaginationLimit)
// Decode the pagination token first if it exist
if paginationToken != "" {
decodedToken, err := model.DecodeDelegationByStakerPaginationToken(paginationToken)
decodedToken, err := model.DecodePaginationToken[model.DelegationByStakerPagination](paginationToken)
if err != nil {
return nil, &InvalidPaginationTokenError{
Message: "Invalid pagination token",
Expand Down
2 changes: 1 addition & 1 deletion internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type DBClient interface {
SubtractFinalityProviderStats(
ctx context.Context, stakingTxHashHex, fpPkHex string, amount uint64,
) error
FindFinalityProviderStatsByPkHex(ctx context.Context, pkHex []string) (map[string]model.FinalityProviderStatsDocument, error)
FindFinalityProviderStats(ctx context.Context, paginationToken string) (*DbResultMap[model.FinalityProviderStatsDocument], error)
IncrementStakerStats(
ctx context.Context, stakingTxHashHex, stakerPkHex string, amount uint64,
) error
Expand Down
26 changes: 1 addition & 25 deletions internal/db/model/delegation.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package model

import (
"encoding/base64"
"encoding/json"

"github.com/babylonchain/staking-api-service/internal/types"
)

Expand Down Expand Up @@ -32,33 +29,12 @@ type DelegationByStakerPagination struct {
StakingStartHeight uint64 `json:"staking_start_height"`
}

func DecodeDelegationByStakerPaginationToken(token string) (*DelegationByStakerPagination, error) {
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return nil, err
}
var d DelegationByStakerPagination
err = json.Unmarshal(tokenBytes, &d)
if err != nil {
return nil, err
}
return &d, nil
}

func (d *DelegationByStakerPagination) GetPaginationToken() (string, error) {
tokenBytes, err := json.Marshal(d)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(tokenBytes), nil
}

func BuildDelegationByStakerPaginationToken(d DelegationDocument) (string, error) {
page := &DelegationByStakerPagination{
StakingTxHashHex: d.StakingTxHashHex,
StakingStartHeight: d.StakingTx.StartHeight,
}
token, err := page.GetPaginationToken()
token, err := GetPaginationToken(page)
if err != nil {
return "", err
}
Expand Down
57 changes: 24 additions & 33 deletions internal/db/model/stats.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package model

import (
"encoding/base64"
"encoding/json"
)

const (
StatsLockCollection = "stats_lock"
OverallStatsCollection = "overall_stats"
Expand Down Expand Up @@ -44,11 +39,28 @@ type OverallStatsDocument struct {
}

type FinalityProviderStatsDocument struct {
Id string `bson:"_id"` // FinalityProviderPkHex:shard-number
ActiveTvl int64 `bson:"active_tvl"`
TotalTvl int64 `bson:"total_tvl"`
ActiveDelegations int64 `bson:"active_delegations"`
TotalDelegations int64 `bson:"total_delegations"`
FinalityProviderPkHex string `bson:"_id"` // FinalityProviderPkHex
ActiveTvl int64 `bson:"active_tvl"`
TotalTvl int64 `bson:"total_tvl"`
ActiveDelegations int64 `bson:"active_delegations"`
TotalDelegations int64 `bson:"total_delegations"`
}

type FinalityProviderStatsPagination struct {
FinalityProviderPkHex string `json:"finality_provider_pk_hex"`
ActiveTvl int64 `json:"active_tvl"`
}

func BuildFinalityProviderStatsPaginationToken(d FinalityProviderStatsDocument) (string, error) {
page := FinalityProviderStatsPagination{
ActiveTvl: d.ActiveTvl,
FinalityProviderPkHex: d.FinalityProviderPkHex,
}
token, err := GetPaginationToken(page)
if err != nil {
return "", err
}
return token, nil
}

type StakerStatsDocument struct {
Expand All @@ -66,33 +78,12 @@ type StakerStatsByStakerPagination struct {
ActiveTvl int64 `json:"active_tvl"`
}

func DecodeStakerStatsByStakerPaginationToken(token string) (*StakerStatsByStakerPagination, error) {
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return nil, err
}
var d StakerStatsByStakerPagination
err = json.Unmarshal(tokenBytes, &d)
if err != nil {
return nil, err
}
return &d, nil
}

func (d *StakerStatsByStakerPagination) GetPaginationToken() (string, error) {
tokenBytes, err := json.Marshal(d)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(tokenBytes), nil
}

func BuildStakerStatsByStakerPaginationToken(d StakerStatsDocument) (string, error) {
page := &StakerStatsByStakerPagination{
page := StakerStatsByStakerPagination{
StakerPkHex: d.StakerPkHex,
ActiveTvl: d.ActiveTvl,
}
token, err := page.GetPaginationToken()
token, err := GetPaginationToken(page)
if err != nil {
return "", err
}
Expand Down
105 changes: 29 additions & 76 deletions internal/db/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"strings"

"github.com/babylonchain/staking-api-service/internal/db/model"
"github.com/babylonchain/staking-api-service/internal/types"
Expand Down Expand Up @@ -247,60 +246,41 @@ func (db *Database) SubtractFinalityProviderStats(
return db.updateFinalityProviderStats(ctx, types.Unbonded.ToString(), stakingTxHashHex, fpPkHex, upsertUpdate)
}

// FindFinalityProviderStatsByPkHex finds the finality provider stats for the given finality provider pk hex
// This method queries all the shards and sums up the stats
// Refer to the README.md in this directory for more information on the sharding logic
func (db *Database) FindFinalityProviderStatsByPkHex(ctx context.Context, pkHex []string) (map[string]model.FinalityProviderStatsDocument, error) {
// FindFinalityProviderStats fetches the finality provider stats from the database
func (db *Database) FindFinalityProviderStats(ctx context.Context, paginationToken string) (*DbResultMap[model.FinalityProviderStatsDocument], error) {
client := db.Client.Database(db.DbName).Collection(model.FinalityProviderStatsCollection)
finalityProvidersMap := make(map[string]model.FinalityProviderStatsDocument)

batchSize := int(db.cfg.DbBatchSizeLimit)
for i := 0; i < len(pkHex); i += batchSize {
end := i + batchSize
if end > len(pkHex) {
end = len(pkHex)
}
batch := pkHex[i:end]
options := options.Find().SetSort(bson.D{{Key: "active_tvl", Value: -1}}) // Sorting in descending order
options.SetLimit(db.cfg.MaxPaginationLimit)
var filter bson.M

filter := bson.M{"_id": bson.M{"$in": db.getAllShardedFinalityProviderId(batch)}}
cursor, err := client.Find(ctx, filter)
// Decode the pagination token first if it exist
if paginationToken != "" {
decodedToken, err := model.DecodePaginationToken[model.FinalityProviderStatsPagination](paginationToken)
if err != nil {
return nil, err
return nil, &InvalidPaginationTokenError{
Message: "Invalid pagination token",
}
}

var shardedFinalityProvidersStats []model.FinalityProviderStatsDocument
if err = cursor.All(ctx, &shardedFinalityProvidersStats); err != nil {
cursor.Close(ctx)
return nil, err
filter = bson.M{
"$or": []bson.M{
{"active_tvl": bson.M{"$lt": decodedToken.ActiveTvl}},
{"active_tvl": decodedToken.ActiveTvl, "_id": bson.M{"$lt": decodedToken.FinalityProviderPkHex}},
},
}
cursor.Close(ctx)
}

// Sum up the stats for the finality provider
for _, fp := range shardedFinalityProvidersStats {
// Retrieve the finality provider pk hex from the id.
fpPkHex, err := extractFinalityProviderPkHexFromStatsId(fp.Id)
if err != nil {
return nil, err
}
if existingFp, ok := finalityProvidersMap[fpPkHex]; ok {
existingFp.ActiveTvl += fp.ActiveTvl
existingFp.TotalTvl += fp.TotalTvl
existingFp.ActiveDelegations += fp.ActiveDelegations
existingFp.TotalDelegations += fp.TotalDelegations

finalityProvidersMap[fpPkHex] = existingFp
} else {
finalityProvidersMap[fpPkHex] = model.FinalityProviderStatsDocument{
ActiveTvl: fp.ActiveTvl,
TotalTvl: fp.TotalTvl,
ActiveDelegations: fp.ActiveDelegations,
TotalDelegations: fp.TotalDelegations,
}
}
}
cursor, err := client.Find(ctx, filter, options)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

return finalityProvidersMap, nil
var finalityProviders []model.FinalityProviderStatsDocument
if err = cursor.All(ctx, &finalityProviders); err != nil {
return nil, err
}

return toResultMapWithPaginationToken(db.cfg, finalityProviders, model.BuildFinalityProviderStatsPaginationToken)
}

func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stakingTxHashHex, fpPkHex string, upsertUpdate primitive.M) error {
Expand All @@ -319,7 +299,7 @@ func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stak
return nil, err
}

upsertFilter := bson.M{"_id": db.generateFinalityProviderStatsId(fpPkHex)}
upsertFilter := bson.M{"_id": fpPkHex}

_, err = client.UpdateOne(sessCtx, upsertFilter, upsertUpdate, options.Update().SetUpsert(true))
if err != nil {
Expand All @@ -337,33 +317,6 @@ func (db *Database) updateFinalityProviderStats(ctx context.Context, state, stak
return nil
}

// Genrate the id for the finality provider stats document.
// Id is a combination of finality provider pk hex and a random number ranged from 0-LogicalShardCount
// This is designed to avoid locking the same field during concurrent writes
func (db *Database) generateFinalityProviderStatsId(finalityProviderPkHex string) string {
randomShardNum := uint64(rand.Intn(int(db.cfg.LogicalShardCount)))
return fmt.Sprintf("%s:%d", finalityProviderPkHex, randomShardNum)
}

func extractFinalityProviderPkHexFromStatsId(id string) (string, error) {
parts := strings.Split(id, ":")
if len(parts) != 2 {
return "", fmt.Errorf("invalid id format: %s", id)
}
return parts[0], nil
}

// Get the finality provider stats document id for all the shards
func (db *Database) getAllShardedFinalityProviderId(finalityProviderPkHex []string) []string {
var ids []string
for _, fpPkHex := range finalityProviderPkHex {
for i := 0; i < int(db.cfg.LogicalShardCount); i++ {
ids = append(ids, fmt.Sprintf("%s:%d", fpPkHex, i))
}
}
return ids
}

// IncrementStakerStats increments the staker stats for the given staking tx hash
// This method is idempotent, only the first call will be processed. Otherwise it will return a notFoundError for duplicates
func (db *Database) IncrementStakerStats(
Expand Down Expand Up @@ -432,7 +385,7 @@ func (db *Database) FindTopStakersByTvl(ctx context.Context, paginationToken str
var filter bson.M
// Decode the pagination token first if it exist
if paginationToken != "" {
decodedToken, err := model.DecodeStakerStatsByStakerPaginationToken(paginationToken)
decodedToken, err := model.DecodePaginationToken[model.StakerStatsByStakerPagination](paginationToken)
if err != nil {
return nil, &InvalidPaginationTokenError{
Message: "Invalid pagination token",
Expand Down
12 changes: 6 additions & 6 deletions internal/queue/handlers/active_staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ func (h *QueueHandler) ActiveStakingHandler(ctx context.Context, messageBody str
return nil
}

// Perform the async stats calculation
statsError := h.Services.ProcessStakingStatsCalculation(
ctx, activeStakingEvent.StakingTxHashHex,
// Perform the async stats calculation by emit the stats event
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
activeStakingEvent.StakingTxHashHex,
activeStakingEvent.StakerPkHex,
activeStakingEvent.FinalityProviderPkHex,
types.Active,
activeStakingEvent.StakingValue,
)
types.Active.ToString(),
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Msg("Failed to process staking stats calculation for active staking")
log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event for active staking")
return statsError
}

Expand Down
16 changes: 8 additions & 8 deletions internal/queue/handlers/expired_staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func (h *QueueHandler) ExpiredStakingHandler(ctx context.Context, messageBody st
return err
}

// Perform the stats calculation
statsErr := h.Services.ProcessStakingStatsCalculation(
ctx, expiredStakingEvent.StakingTxHashHex,
// Perform the async stats calculation by emit the stats event
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
expiredStakingEvent.StakingTxHashHex,
del.StakerPkHex,
del.FinalityProviderPkHex,
types.Unbonded,
del.StakingValue,
)
if statsErr != nil {
log.Error().Err(statsErr).Msg("Failed to process staking stats calculation after timelock expired")
return statsErr
types.Unbonded.ToString(),
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event after timelock expired")
return statsError
}

transitionErr := h.Services.TransitionToUnbondedState(ctx, txType, expiredStakingEvent.StakingTxHashHex)
Expand Down
Loading