From 941940cec74c2293a71c5d3a1349e1c06486364e Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 17 Oct 2024 13:21:42 -0400 Subject: [PATCH 01/12] setup dels --- .../images/babylon-staking-indexer/Dockerfile | 2 +- docker-compose.yml | 6 +-- go.mod | 2 +- internal/db/delegation.go | 32 +++++++++++ internal/db/interface.go | 10 ++++ internal/db/model/delegation.go | 31 +++++++++-- internal/db/model/setup.go | 4 +- internal/db/params.go | 18 ++++++- internal/services/delegation.go | 53 ++++++++++++++++++- internal/services/events.go | 4 ++ internal/services/global-params.go | 16 ++++++ internal/services/service.go | 6 +-- internal/utils/poller/poller.go | 8 ++- tests/mocks/mock_bbn_client.go | 2 +- tests/mocks/mock_btc_client.go | 2 +- tests/mocks/mock_db_client.go | 20 ++++++- 16 files changed, 194 insertions(+), 22 deletions(-) create mode 100644 internal/db/delegation.go diff --git a/contrib/images/babylon-staking-indexer/Dockerfile b/contrib/images/babylon-staking-indexer/Dockerfile index 19a3309..991416d 100644 --- a/contrib/images/babylon-staking-indexer/Dockerfile +++ b/contrib/images/babylon-staking-indexer/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21-alpine AS builder +FROM golang:1.22.3-alpine AS builder ARG VERSION="HEAD" diff --git a/docker-compose.yml b/docker-compose.yml index b300dfd..d49b182 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,16 @@ version: "3.9" services: babylon-staking-indexer: - image: babylonlabs/babylon-staking-indexer:latest + image: babylonlabs-io/babylon-staking-indexer:latest container_name: babylon-staking-indexer environment: - CONFIG=/home/babylon-staking-indexer/config.yml depends_on: - - mongodb + - indexer-mongodb - rabbitmq volumes: - ./config/config-docker.yml:/home/babylon-staking-indexer/config.yml:Z - mongodb: + indexer-mongodb: image: mongo:latest container_name: indexer-mongodb hostname: indexer-mongodb diff --git a/go.mod b/go.mod index 5ac1aad..b15498e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/babylonlabs-io/babylon-staking-indexer -go 1.23.2 +go 1.22.3 require ( github.com/babylonlabs-io/babylon v0.12.1 diff --git a/internal/db/delegation.go b/internal/db/delegation.go new file mode 100644 index 0000000..4c77f37 --- /dev/null +++ b/internal/db/delegation.go @@ -0,0 +1,32 @@ +package db + +import ( + "context" + "errors" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "go.mongodb.org/mongo-driver/mongo" +) + +func (db *Database) SaveNewBTCDelegation( + ctx context.Context, delegationDoc *model.BTCDelegationDetails, +) error { + _, err := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + InsertOne(ctx, delegationDoc) + if err != nil { + var writeErr mongo.WriteException + if errors.As(err, &writeErr) { + for _, e := range writeErr.WriteErrors { + if mongo.IsDuplicateKeyError(e) { + return &DuplicateKeyError{ + Key: delegationDoc.StakingTxHashHex, + Message: "delegation already exists", + } + } + } + } + return err + } + return nil +} diff --git a/internal/db/interface.go b/internal/db/interface.go index d96cc62..63de325 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -73,4 +73,14 @@ type DbInterface interface { SaveCheckpointParams( ctx context.Context, params *bbnclient.CheckpointParams, ) error + /** + * SaveNewBTCDelegation saves a new BTC delegation to the database. + * If the BTC delegation already exists, DuplicateKeyError will be returned. + * @param ctx The context + * @param delegationDoc The BTC delegation details + * @return An error if the operation failed + */ + SaveNewBTCDelegation( + ctx context.Context, delegationDoc *model.BTCDelegationDetails, + ) error } diff --git a/internal/db/model/delegation.go b/internal/db/model/delegation.go index 901e93f..026e76a 100644 --- a/internal/db/model/delegation.go +++ b/internal/db/model/delegation.go @@ -2,10 +2,33 @@ package model import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" ) -type DelegationDocument struct { - StakingTxHashHex string `bson:"_id"` // Primary key - State types.DelegationState `bson:"state"` - // TODO: Placeholder for more fields +type BTCDelegationDetails struct { + StakingTxHashHex string `bson:"_id"` // Primary key + ParamsVersion string `bson:"params_version"` + FinalityProviderBtcPksHex []string `bson:"finality_provider_btc_pks_hex"` + StakerBtcPkHex string `bson:"staker_btc_pk_hex"` + StakingTime string `bson:"staking_time"` + StakingAmount string `bson:"staking_amount"` + UnbondingTime string `bson:"unbonding_time"` + UnbondingTx string `bson:"unbonding_tx"` + State types.DelegationState `bson:"state"` +} + +func FromEventBTCDelegationCreated( + event *bbntypes.EventBTCDelegationCreated, +) *BTCDelegationDetails { + return &BTCDelegationDetails{ + StakingTxHashHex: event.StakingTxHash, // babylon returns a hex string + ParamsVersion: event.ParamsVersion, + FinalityProviderBtcPksHex: event.FinalityProviderBtcPksHex, + StakerBtcPkHex: event.StakerBtcPkHex, + StakingTime: event.StakingTime, + StakingAmount: event.StakingAmount, + UnbondingTime: event.UnbondingTime, + UnbondingTx: event.UnbondingTx, + State: types.DelegationState(event.NewState), + } } diff --git a/internal/db/model/setup.go b/internal/db/model/setup.go index a4edde5..0a96ecf 100644 --- a/internal/db/model/setup.go +++ b/internal/db/model/setup.go @@ -15,7 +15,7 @@ import ( const ( FinalityProviderDetailsCollection = "finality_provider_details" - DelegationCollection = "delegation" + BTCDelegationDetailsCollection = "btc_delegation_details" GlobalParamsCollection = "global_params" ) @@ -26,7 +26,7 @@ type index struct { var collections = map[string][]index{ FinalityProviderDetailsCollection: {{Indexes: map[string]int{}}}, - DelegationCollection: {{Indexes: map[string]int{}}}, + BTCDelegationDetailsCollection: {{Indexes: map[string]int{}}}, GlobalParamsCollection: {{Indexes: map[string]int{}}}, } diff --git a/internal/db/params.go b/internal/db/params.go index c089c5f..2c88884 100644 --- a/internal/db/params.go +++ b/internal/db/params.go @@ -6,6 +6,7 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "github.com/rs/zerolog/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -49,6 +50,8 @@ func (db *Database) SaveStakingParams( func (db *Database) SaveCheckpointParams( ctx context.Context, params *bbnclient.CheckpointParams, ) error { + log.Debug().Msg("Saving checkpoint params") + collection := db.client.Database(db.dbName). Collection(model.GlobalParamsCollection) @@ -65,9 +68,22 @@ func (db *Database) SaveCheckpointParams( }, } - _, err := collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) + log.Debug(). + Interface("filter", filter). + Interface("update", update). + Msg("Attempting to update checkpoint params") + + result, err := collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) if err != nil { + log.Error().Err(err).Msg("Failed to save checkpoint params") return fmt.Errorf("failed to save checkpoint params: %w", err) } + + log.Info(). + Int64("matched_count", result.MatchedCount). + Int64("modified_count", result.ModifiedCount). + Int64("upserted_count", result.UpsertedCount). + Msg("Successfully saved checkpoint params") + return nil } diff --git a/internal/services/delegation.go b/internal/services/delegation.go index d16a5c5..8ef54d1 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -1,6 +1,17 @@ package services -import "context" +import ( + "context" + "fmt" + "net/http" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" + abcitypes "github.com/cometbft/cometbft/abci/types" + "github.com/rs/zerolog/log" +) const ( EventBTCDelegationCreated EventTypes = "babylon.btcstaking.v1.EventBTCDelegationCreated" @@ -9,5 +20,43 @@ const ( EventBTCDelegationExpired EventTypes = "babylon.btcstaking.v1.EventBTCDelegationExpired" ) -func (s *Service) processBTCDelegationStateUpdateEvent(ctx context.Context) { +func (s *Service) processNewBTCDelegationEvent( + ctx context.Context, event abcitypes.Event, +) *types.Error { + log.Debug().Msg("Processing new BTC delegation event") + newDelegation, err := parseEvent[bbntypes.EventBTCDelegationCreated]( + EventBTCDelegationCreated, event, + ) + if err != nil { + log.Error().Err(err).Msg("Failed to parse BTC delegation event") + return err + } + if err := validateBTCDelegationCreatedEvent(newDelegation); err != nil { + log.Error().Err(err).Msg("Failed to validate BTC delegation event") + return err + } + log.Debug().Interface("newDelegation", newDelegation).Msg("Saving new BTC delegation") + if err := s.db.SaveNewBTCDelegation( + ctx, model.FromEventBTCDelegationCreated(newDelegation), + ); err != nil { + if db.IsDuplicateKeyError(err) { + log.Info().Msg("BTC delegation already exists, ignoring the event") + return nil + } + log.Error().Err(err).Msg("Failed to save new BTC delegation") + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to save new BTC delegation: %w", err), + ) + } + + log.Info().Msg("Successfully processed and saved new BTC delegation") + return nil +} + +// You'll need to implement these functions: +func validateBTCDelegationCreatedEvent(event *bbntypes.EventBTCDelegationCreated) *types.Error { + // Implement validation logic here + return nil } diff --git a/internal/services/events.go b/internal/services/events.go index 0c1ad81..95544ed 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -55,10 +55,14 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) { // Note: We no longer need to check for the event category here. We can directly // process the event based on its type. bbnEvent := event.Event + // log.Debug().Str("event_type", bbnEvent.Type).Msg("Processing event") switch EventTypes(bbnEvent.Type) { case EventFinalityProviderCreatedType: log.Debug().Msg("Processing new finality provider event") s.processNewFinalityProviderEvent(ctx, bbnEvent) + case EventBTCDelegationCreated: + log.Debug().Msg("Processing new BTC delegation event") + s.processNewBTCDelegationEvent(ctx, bbnEvent) case EventFinalityProviderEditedType: log.Debug().Msg("Processing finality provider edited event") s.processFinalityProviderEditedEvent(ctx, bbnEvent) diff --git a/internal/services/global-params.go b/internal/services/global-params.go index 22bc0c3..2b2d477 100644 --- a/internal/services/global-params.go +++ b/internal/services/global-params.go @@ -6,6 +6,7 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils/poller" + "github.com/rs/zerolog/log" ) func (s *Service) SyncGlobalParams(ctx context.Context) { @@ -17,37 +18,52 @@ func (s *Service) SyncGlobalParams(ctx context.Context) { } func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { + log.Debug().Msg("Fetching and saving global parameters") + checkpointParams, err := s.bbn.GetCheckpointParams(ctx) if err != nil { // TODO: Add metrics and replace internal service error with a more specific // error code so that the poller can catch and emit the error metrics + log.Error().Err(err).Msg("Failed to get checkpoint params") return types.NewInternalServiceError( fmt.Errorf("failed to get checkpoint params: %w", err), ) } + log.Debug().Interface("checkpointParams", checkpointParams).Msg("Retrieved checkpoint params") + if err := s.db.SaveCheckpointParams(ctx, checkpointParams); err != nil { + log.Error().Err(err).Msg("Failed to save checkpoint params") return types.NewInternalServiceError( fmt.Errorf("failed to save checkpoint params: %w", err), ) } + log.Info().Msg("Successfully saved checkpoint params") allStakingParams, err := s.bbn.GetAllStakingParams(ctx) if err != nil { + log.Error().Err(err).Msg("Failed to get staking params") return types.NewInternalServiceError( fmt.Errorf("failed to get staking params: %w", err), ) } + log.Debug().Interface("allStakingParams", allStakingParams).Msg("Retrieved all staking params") + for version, params := range allStakingParams { if params == nil { + log.Error().Uint32("version", version).Msg("Nil staking params encountered") return types.NewInternalServiceError( fmt.Errorf("nil staking params for version %d", version), ) } if err := s.db.SaveStakingParams(ctx, version, params); err != nil { + log.Error().Err(err).Uint32("version", version).Msg("Failed to save staking params") return types.NewInternalServiceError( fmt.Errorf("failed to save staking params: %w", err), ) } + log.Info().Uint32("version", version).Msg("Successfully saved staking params") } + + log.Info().Msg("Successfully fetched and saved all global parameters") return nil } diff --git a/internal/services/service.go b/internal/services/service.go index ac26459..0700ec4 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -41,9 +41,9 @@ func (s *Service) StartIndexerSync(ctx context.Context) { // Sync global parameters s.SyncGlobalParams(ctx) // Start the bootstrap process - s.BootstrapBbn(ctx) - // Start the websocket event subscription process - s.SubscribeToBbnEvents(ctx) + // s.BootstrapBbn(ctx) + // // Start the websocket event subscription process + // s.SubscribeToBbnEvents(ctx) // Keep processing events in the main thread s.StartBbnEventProcessor(ctx) } diff --git a/internal/utils/poller/poller.go b/internal/utils/poller/poller.go index f17aee6..03f02bf 100644 --- a/internal/utils/poller/poller.go +++ b/internal/utils/poller/poller.go @@ -24,20 +24,24 @@ func NewPoller(interval time.Duration, pollMethod func(ctx context.Context) *typ func (p *Poller) Start(ctx context.Context) { ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + log.Info().Msgf("Starting poller with interval %s", p.interval) for { select { case <-ticker.C: + log.Debug().Msg("Executing poll method") if err := p.pollMethod(ctx); err != nil { log.Error().Err(err).Msg("Error polling") + } else { + log.Debug().Msg("Poll method executed successfully") } case <-ctx.Done(): - // Handle context cancellation. log.Info().Msg("Poller stopped due to context cancellation") return case <-p.quit: log.Info().Msg("Poller stopped") - ticker.Stop() // Stop the ticker return } } diff --git a/tests/mocks/mock_bbn_client.go b/tests/mocks/mock_bbn_client.go index 92fdd8b..d2591c9 100644 --- a/tests/mocks/mock_bbn_client.go +++ b/tests/mocks/mock_bbn_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. +// Code generated by mockery v2.42.1. DO NOT EDIT. package mocks diff --git a/tests/mocks/mock_btc_client.go b/tests/mocks/mock_btc_client.go index 21a8239..83c0ef2 100644 --- a/tests/mocks/mock_btc_client.go +++ b/tests/mocks/mock_btc_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. +// Code generated by mockery v2.42.1. DO NOT EDIT. package mocks diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 1efd07f..94805e4 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. +// Code generated by mockery v2.42.1. DO NOT EDIT. package mocks @@ -85,6 +85,24 @@ func (_m *DbInterface) SaveCheckpointParams(ctx context.Context, params *types.P return r0 } +// SaveNewBTCDelegation provides a mock function with given fields: ctx, delegationDoc +func (_m *DbInterface) SaveNewBTCDelegation(ctx context.Context, delegationDoc *model.BTCDelegationDetails) error { + ret := _m.Called(ctx, delegationDoc) + + if len(ret) == 0 { + panic("no return value specified for SaveNewBTCDelegation") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *model.BTCDelegationDetails) error); ok { + r0 = rf(ctx, delegationDoc) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SaveNewFinalityProvider provides a mock function with given fields: ctx, fpDoc func (_m *DbInterface) SaveNewFinalityProvider(ctx context.Context, fpDoc *model.FinalityProviderDetails) error { ret := _m.Called(ctx, fpDoc) From dbde0dd25db75beb1ab4400d3f6290d9f370867e Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 17 Oct 2024 15:16:30 -0400 Subject: [PATCH 02/12] point local to other point --- config/config-local.yml | 2 +- docker-compose.yml | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/config/config-local.yml b/config/config-local.yml index 8e95bd5..4aab66e 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -1,7 +1,7 @@ db: username: root password: example - address: "mongodb://localhost:27017" + address: "mongodb://localhost:27019/?replicaSet=RS&directConnection=true" db-name: babylon-staking-indexer btc: endpoint: localhost:18332 diff --git a/docker-compose.yml b/docker-compose.yml index d49b182..6303b43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,18 +10,22 @@ services: - rabbitmq volumes: - ./config/config-docker.yml:/home/babylon-staking-indexer/config.yml:Z + networks: + - indexer-network indexer-mongodb: image: mongo:latest container_name: indexer-mongodb hostname: indexer-mongodb ports: - - "27017:27017" + - "27019:27017" environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example volumes: - ./bin/init-mongo.sh:/init-mongo.sh entrypoint: [ "/init-mongo.sh" ] + networks: + - indexer-network rabbitmq: image: rabbitmq:3-management container_name: rabbitmq @@ -33,3 +37,9 @@ services: RABBITMQ_DEFAULT_PASS: password volumes: - "./rabbitmq_data:/var/lib/rabbitmq" + networks: + - indexer-network + +networks: + indexer-network: + driver: bridge From 47238705c6aabea1175742f7f2fce9c17aef8290 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 18 Oct 2024 11:47:12 -0400 Subject: [PATCH 03/12] debug parse event --- internal/services/events.go | 24 ++++++++++++++++++++++-- internal/services/service.go | 6 +++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/services/events.go b/internal/services/events.go index 95544ed..5acbed3 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -99,14 +99,34 @@ func parseEvent[T any]( } // Create a map to store the attributes - attributeMap := make(map[string]string) + attributeMap := make(map[string]interface{}) // Populate the attribute map from the event's attributes for _, attr := range event.Attributes { // Unescape the attribute value - attributeMap[attr.Key] = utils.SafeUnescape(attr.Value) + // attributeMap[attr.Key] = utils.SafeUnescape(attr.Value) + + // Unescape the attribute value + unescapedValue := utils.SafeUnescape(attr.Value) + log.Debug().Str("unescapedValue", unescapedValue).Msg("unescapedValue") + log.Debug().Str("attr.Key", attr.Key).Msg("attr.Key") + log.Debug().Str("attr.Value", attr.Value).Msg("attr.Value") + + // Try to unmarshal the value into a more specific type + var value interface{} + if err := json.Unmarshal([]byte(unescapedValue), &value); err == nil { + log.Debug().Interface("unmarshalled value", value).Msg("unmarshalled value") + attributeMap[attr.Key] = value + } else { + // If unmarshaling fails, use the string as-is + log.Debug().Str("unescapedValue", unescapedValue).Msg("unescapedValue") + attributeMap[attr.Key] = unescapedValue + } + } + log.Debug().Interface("attributeMap", attributeMap).Msg("attributeMap") + // Marshal the attributeMap into JSON attrJSON, err := json.Marshal(attributeMap) if err != nil { diff --git a/internal/services/service.go b/internal/services/service.go index 0700ec4..ac26459 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -41,9 +41,9 @@ func (s *Service) StartIndexerSync(ctx context.Context) { // Sync global parameters s.SyncGlobalParams(ctx) // Start the bootstrap process - // s.BootstrapBbn(ctx) - // // Start the websocket event subscription process - // s.SubscribeToBbnEvents(ctx) + s.BootstrapBbn(ctx) + // Start the websocket event subscription process + s.SubscribeToBbnEvents(ctx) // Keep processing events in the main thread s.StartBbnEventProcessor(ctx) } From dbbff45bed0c23120142070dc7c1a1310d09f690 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 18 Oct 2024 17:57:59 -0400 Subject: [PATCH 04/12] fix for parse events --- go.mod | 4 +- internal/services/delegation.go | 2 +- internal/services/events.go | 111 +++++++++++++++---------- internal/services/finality-provider.go | 6 +- 4 files changed, 72 insertions(+), 51 deletions(-) diff --git a/go.mod b/go.mod index b15498e..680d157 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/babylonlabs-io/staking-queue-client v0.4.1 github.com/btcsuite/btcd v0.24.2 github.com/cometbft/cometbft v0.38.7 + github.com/cosmos/cosmos-sdk v0.50.6 + github.com/cosmos/gogoproto v1.7.0 github.com/go-chi/chi/v5 v5.1.0 github.com/spf13/viper v1.19.0 ) @@ -62,10 +64,8 @@ require ( github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-db v1.0.2 // indirect github.com/cosmos/cosmos-proto v1.0.0-beta.5 // indirect - github.com/cosmos/cosmos-sdk v0.50.6 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect - github.com/cosmos/gogoproto v1.7.0 // indirect github.com/cosmos/iavl v1.1.2 // indirect github.com/cosmos/ibc-go/modules/capability v1.0.0 // indirect github.com/cosmos/ibc-go/v8 v8.3.0 // indirect diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 8ef54d1..ee88a98 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -24,7 +24,7 @@ func (s *Service) processNewBTCDelegationEvent( ctx context.Context, event abcitypes.Event, ) *types.Error { log.Debug().Msg("Processing new BTC delegation event") - newDelegation, err := parseEvent[bbntypes.EventBTCDelegationCreated]( + newDelegation, err := parseEvent[*bbntypes.EventBTCDelegationCreated]( EventBTCDelegationCreated, event, ) if err != nil { diff --git a/internal/services/events.go b/internal/services/events.go index 5acbed3..1e4ebf1 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -2,14 +2,14 @@ package services import ( "context" - "encoding/json" "fmt" "net/http" "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils" abcitypes "github.com/cometbft/cometbft/abci/types" + sdk "github.com/cosmos/cosmos-sdk/types" + proto "github.com/cosmos/gogoproto/proto" "github.com/rs/zerolog/log" ) @@ -72,12 +72,14 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) { } } -func parseEvent[T any]( +func parseEvent[T proto.Message]( expectedType EventTypes, event abcitypes.Event, -) (*T, *types.Error) { +) (T, *types.Error) { + var result T + if EventTypes(event.Type) != expectedType { - return nil, types.NewErrorWithMsg( + return result, types.NewErrorWithMsg( http.StatusInternalServerError, types.InternalServiceError, fmt.Sprintf( @@ -88,7 +90,7 @@ func parseEvent[T any]( ) } if len(event.Attributes) == 0 { - return nil, types.NewErrorWithMsg( + return result, types.NewErrorWithMsg( http.StatusInternalServerError, types.InternalServiceError, fmt.Sprintf( @@ -98,55 +100,74 @@ func parseEvent[T any]( ) } - // Create a map to store the attributes - attributeMap := make(map[string]interface{}) - - // Populate the attribute map from the event's attributes - for _, attr := range event.Attributes { - // Unescape the attribute value - // attributeMap[attr.Key] = utils.SafeUnescape(attr.Value) - - // Unescape the attribute value - unescapedValue := utils.SafeUnescape(attr.Value) - log.Debug().Str("unescapedValue", unescapedValue).Msg("unescapedValue") - log.Debug().Str("attr.Key", attr.Key).Msg("attr.Key") - log.Debug().Str("attr.Value", attr.Value).Msg("attr.Value") - - // Try to unmarshal the value into a more specific type - var value interface{} - if err := json.Unmarshal([]byte(unescapedValue), &value); err == nil { - log.Debug().Interface("unmarshalled value", value).Msg("unmarshalled value") - attributeMap[attr.Key] = value - } else { - // If unmarshaling fails, use the string as-is - log.Debug().Str("unescapedValue", unescapedValue).Msg("unescapedValue") - attributeMap[attr.Key] = unescapedValue - } - - } - - log.Debug().Interface("attributeMap", attributeMap).Msg("attributeMap") - - // Marshal the attributeMap into JSON - attrJSON, err := json.Marshal(attributeMap) + // Use the SDK's ParseTypedEvent function + parsedEvent, err := sdk.ParseTypedEvent(event) if err != nil { - return nil, types.NewError( + return result, types.NewError( http.StatusInternalServerError, types.InternalServiceError, - fmt.Errorf("failed to marshal attributes into JSON: %w", err), + fmt.Errorf("failed to parse typed event: %w", err), ) } + // Log the parsed event + log.Debug(). + Interface("parsed_event", parsedEvent). + Msg("Parsed event details") + + evtType := proto.MessageName(parsedEvent) + log.Debug().Str("event_type", evtType).Msg("parsed event type") + + // Check if the parsed event is of the expected type + // if reflect.TypeOf(parsedEvent) != reflect.TypeOf(result) { + // return nil, types.NewError( + // http.StatusInternalServerError, + // types.InternalServiceError, + // fmt.Errorf("parsed event type %T does not match expected type %T", parsedEvent, result), + // ) + // } + + // Create a map to store the attributes + // attributeMap := make(map[string]string) + + // // Populate the attribute map from the event's attributes + // for _, attr := range event.Attributes { + // // Unescape the attribute value + // attributeMap[attr.Key] = utils.SafeUnescape(attr.Value) + // } + + // log.Debug().Interface("attributeMap", attributeMap).Msg("attributeMap") + + // Marshal the attributeMap into JSON + // attrJSON, err := json.Marshal(attributeMap) + // if err != nil { + // return nil, types.NewError( + // http.StatusInternalServerError, + // types.InternalServiceError, + // fmt.Errorf("failed to marshal attributes into JSON: %w", err), + // ) + // } + // Unmarshal the JSON into the T struct - var evt T - err = json.Unmarshal(attrJSON, &evt) - if err != nil { - return nil, types.NewError( + // var evt T + // err = json.Unmarshal(attrJSON, &evt) + // if err != nil { + // return nil, types.NewError( + // http.StatusInternalServerError, + // types.InternalServiceError, + // fmt.Errorf("failed to unmarshal attributes into %T: %w", evt, err), + // ) + // } + + // Type assert the parsed event to the expected type + result, ok := parsedEvent.(T) + if !ok { + return result, types.NewError( http.StatusInternalServerError, types.InternalServiceError, - fmt.Errorf("failed to unmarshal attributes into %T: %w", evt, err), + fmt.Errorf("failed to assert parsed event to type %T", result), ) } - return &evt, nil + return result, nil } diff --git a/internal/services/finality-provider.go b/internal/services/finality-provider.go index 979148a..474ef15 100644 --- a/internal/services/finality-provider.go +++ b/internal/services/finality-provider.go @@ -21,7 +21,7 @@ const ( func (s *Service) processNewFinalityProviderEvent( ctx context.Context, event abcitypes.Event, ) *types.Error { - newFinalityProvider, err := parseEvent[bbntypes.EventFinalityProviderCreated]( + newFinalityProvider, err := parseEvent[*bbntypes.EventFinalityProviderCreated]( EventFinalityProviderCreatedType, event, ) if err != nil { @@ -50,7 +50,7 @@ func (s *Service) processNewFinalityProviderEvent( func (s *Service) processFinalityProviderEditedEvent( ctx context.Context, event abcitypes.Event, ) *types.Error { - finalityProviderEdited, err := parseEvent[bbntypes.EventFinalityProviderEdited]( + finalityProviderEdited, err := parseEvent[*bbntypes.EventFinalityProviderEdited]( EventFinalityProviderEditedType, event, ) if err != nil { @@ -75,7 +75,7 @@ func (s *Service) processFinalityProviderEditedEvent( func (s *Service) processFinalityProviderStateChangeEvent( ctx context.Context, event abcitypes.Event, ) *types.Error { - finalityProviderStateChange, err := parseEvent[bbntypes.EventFinalityProviderStatusChange]( + finalityProviderStateChange, err := parseEvent[*bbntypes.EventFinalityProviderStatusChange]( EventFinalityProviderStatusChange, event, ) if err != nil { From 6872fff3d2fb3bc2c0e2837a19fcaf7fdb90ce65 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sat, 19 Oct 2024 22:33:25 -0400 Subject: [PATCH 05/12] add more del events --- internal/db/delegation.go | 30 +++++ internal/db/interface.go | 20 +++ internal/services/delegation.go | 220 ++++++++++++++++++++++++++++++-- internal/services/events.go | 70 ++++------ 4 files changed, 284 insertions(+), 56 deletions(-) diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 4c77f37..62923d9 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "go.mongodb.org/mongo-driver/mongo" ) @@ -30,3 +31,32 @@ func (db *Database) SaveNewBTCDelegation( } return nil } + +func (db *Database) UpdateBTCDelegationState( + ctx context.Context, stakingTxHash string, newState types.DelegationState, +) error { + return nil +} + +func (db *Database) GetBTCDelegationByStakingTxHash( + ctx context.Context, stakingTxHash string, +) (*model.BTCDelegationDetails, error) { + filter := map[string]interface{}{"_id": stakingTxHash} + res := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + FindOne(ctx, filter) + + var delegationDoc model.BTCDelegationDetails + err := res.Decode(&delegationDoc) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return nil, &NotFoundError{ + Key: stakingTxHash, + Message: "BTC delegation not found", + } + } + return nil, err + } + + return &delegationDoc, nil +} diff --git a/internal/db/interface.go b/internal/db/interface.go index 63de325..4781ea7 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -5,6 +5,7 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" ) type DbInterface interface { @@ -83,4 +84,23 @@ type DbInterface interface { SaveNewBTCDelegation( ctx context.Context, delegationDoc *model.BTCDelegationDetails, ) error + /** + * SaveBTCDelegationStateUpdate saves a BTC delegation state update to the database. + * @param ctx The context + * @param delegationDoc The BTC delegation details + * @return An error if the operation failed + */ + UpdateBTCDelegationState( + ctx context.Context, stakingTxHash string, newState types.DelegationState, + ) error + /** + * GetBTCDelegationByStakingTxHash retrieves the BTC delegation details by the staking tx hash. + * If the BTC delegation does not exist, a NotFoundError will be returned. + * @param ctx The context + * @param stakingTxHash The staking tx hash + * @return The BTC delegation details or an error + */ + GetBTCDelegationByStakingTxHash( + ctx context.Context, stakingTxHash string, + ) (*model.BTCDelegationDetails, error) } diff --git a/internal/services/delegation.go b/internal/services/delegation.go index ee88a98..0234f23 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -10,40 +10,72 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" bbntypes "github.com/babylonlabs-io/babylon/x/btcstaking/types" abcitypes "github.com/cometbft/cometbft/abci/types" - "github.com/rs/zerolog/log" ) const ( + EventBTCDelegationStateUpdate EventTypes = "babylon.btcstaking.v1.EventBTCDelegationStateUpdate" EventBTCDelegationCreated EventTypes = "babylon.btcstaking.v1.EventBTCDelegationCreated" + EventCovenantQuorumReached EventTypes = "babylon.btcstaking.v1.EventCovenantQuorumReached" EventBTCDelegationInclusionProofReceived EventTypes = "babylon.btcstaking.v1.EventBTCDelegationInclusionProofReceived" EventBTCDelgationUnbondedEarly EventTypes = "babylon.btcstaking.v1.EventBTCDelgationUnbondedEarly" EventBTCDelegationExpired EventTypes = "babylon.btcstaking.v1.EventBTCDelegationExpired" ) +func (s *Service) processBTCDelegationStateUpdateEvent(ctx context.Context, event abcitypes.Event) *types.Error { + stateUpdate, err := parseEvent[*bbntypes.EventBTCDelegationStateUpdate]( + EventBTCDelegationStateUpdate, event, + ) + if err != nil { + return err + } + + if err := validateBTCDelegationStateUpdateEvent(stateUpdate); err != nil { + return err + } + + // Check if BTC delegation exists + _, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, stateUpdate.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + if err := s.db.UpdateBTCDelegationState( + ctx, stateUpdate.StakingTxHash, types.DelegationState(stateUpdate.NewState), + ); err != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", err), + ) + } + + return nil +} + func (s *Service) processNewBTCDelegationEvent( ctx context.Context, event abcitypes.Event, ) *types.Error { - log.Debug().Msg("Processing new BTC delegation event") newDelegation, err := parseEvent[*bbntypes.EventBTCDelegationCreated]( EventBTCDelegationCreated, event, ) if err != nil { - log.Error().Err(err).Msg("Failed to parse BTC delegation event") return err } + if err := validateBTCDelegationCreatedEvent(newDelegation); err != nil { - log.Error().Err(err).Msg("Failed to validate BTC delegation event") return err } - log.Debug().Interface("newDelegation", newDelegation).Msg("Saving new BTC delegation") if err := s.db.SaveNewBTCDelegation( ctx, model.FromEventBTCDelegationCreated(newDelegation), ); err != nil { if db.IsDuplicateKeyError(err) { - log.Info().Msg("BTC delegation already exists, ignoring the event") + // BTC delegation already exists, ignore the event return nil } - log.Error().Err(err).Msg("Failed to save new BTC delegation") return types.NewError( http.StatusInternalServerError, types.InternalServiceError, @@ -51,7 +83,154 @@ func (s *Service) processNewBTCDelegationEvent( ) } - log.Info().Msg("Successfully processed and saved new BTC delegation") + return nil +} + +func (s *Service) processCovenantQuorumReachedEvent( + ctx context.Context, event abcitypes.Event, +) *types.Error { + covenantQuorumReachedEvent, err := parseEvent[*bbntypes.EventCovenantQuorumReached]( + EventCovenantQuorumReached, event, + ) + if err != nil { + return err + } + + if err := validateCovenantQuorumReachedEvent(covenantQuorumReachedEvent); err != nil { + return err + } + + // Check if BTC delegation exists + _, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + if err := s.db.UpdateBTCDelegationState( + ctx, covenantQuorumReachedEvent.StakingTxHash, types.DelegationState(covenantQuorumReachedEvent.NewState), + ); err != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", err), + ) + } + + return nil +} + +func (s *Service) processBTCDelegationInclusionProofReceivedEvent( + ctx context.Context, event abcitypes.Event, +) *types.Error { + inclusionProofEvent, err := parseEvent[*bbntypes.EventBTCDelegationInclusionProofReceived]( + EventBTCDelegationInclusionProofReceived, event, + ) + if err != nil { + return err + } + + if err := validateBTCDelegationInclusionProofReceivedEvent(inclusionProofEvent); err != nil { + return err + } + + // Check if BTC delegation exists + _, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + if err := s.db.UpdateBTCDelegationState( + ctx, inclusionProofEvent.StakingTxHash, types.DelegationState(inclusionProofEvent.NewState), + ); err != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", err), + ) + } + + return nil +} + +func (s *Service) processBTCDelegationUnbondedEarlyEvent( + ctx context.Context, event abcitypes.Event, +) *types.Error { + unbondedEarlyEvent, err := parseEvent[*bbntypes.EventBTCDelgationUnbondedEarly]( + EventBTCDelgationUnbondedEarly, event, + ) + if err != nil { + return err + } + + if err := validateBTCDelegationUnbondedEarlyEvent(unbondedEarlyEvent); err != nil { + return err + } + + // Check if BTC delegation exists + _, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, unbondedEarlyEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + if err := s.db.UpdateBTCDelegationState( + ctx, unbondedEarlyEvent.StakingTxHash, types.DelegationState(unbondedEarlyEvent.NewState), + ); err != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", err), + ) + } + + return nil +} + +func (s *Service) processBTCDelegationExpiredEvent( + ctx context.Context, event abcitypes.Event, +) *types.Error { + expiredEvent, err := parseEvent[*bbntypes.EventBTCDelegationExpired]( + EventBTCDelegationExpired, event, + ) + if err != nil { + return err + } + + if err := validateBTCDelegationExpiredEvent(expiredEvent); err != nil { + return err + } + + // Check if BTC delegation exists + _, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, expiredEvent.StakingTxHash) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), + ) + } + + if err := s.db.UpdateBTCDelegationState( + ctx, expiredEvent.StakingTxHash, types.DelegationState(expiredEvent.NewState), + ); err != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", err), + ) + } + return nil } @@ -60,3 +239,28 @@ func validateBTCDelegationCreatedEvent(event *bbntypes.EventBTCDelegationCreated // Implement validation logic here return nil } + +func validateBTCDelegationStateUpdateEvent(event *bbntypes.EventBTCDelegationStateUpdate) *types.Error { + // Implement validation logic here + return nil +} + +func validateCovenantQuorumReachedEvent(event *bbntypes.EventCovenantQuorumReached) *types.Error { + // Implement validation logic here + return nil +} + +func validateBTCDelegationInclusionProofReceivedEvent(event *bbntypes.EventBTCDelegationInclusionProofReceived) *types.Error { + // Implement validation logic here + return nil +} + +func validateBTCDelegationUnbondedEarlyEvent(event *bbntypes.EventBTCDelgationUnbondedEarly) *types.Error { + // Implement validation logic here + return nil +} + +func validateBTCDelegationExpiredEvent(event *bbntypes.EventBTCDelegationExpired) *types.Error { + // Implement validation logic here + return nil +} diff --git a/internal/services/events.go b/internal/services/events.go index 1e4ebf1..965e236 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -60,15 +60,30 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) { case EventFinalityProviderCreatedType: log.Debug().Msg("Processing new finality provider event") s.processNewFinalityProviderEvent(ctx, bbnEvent) - case EventBTCDelegationCreated: - log.Debug().Msg("Processing new BTC delegation event") - s.processNewBTCDelegationEvent(ctx, bbnEvent) case EventFinalityProviderEditedType: log.Debug().Msg("Processing finality provider edited event") s.processFinalityProviderEditedEvent(ctx, bbnEvent) case EventFinalityProviderStatusChange: log.Debug().Msg("Processing finality provider status change event") s.processFinalityProviderStateChangeEvent(ctx, bbnEvent) + case EventBTCDelegationCreated: + log.Debug().Msg("Processing new BTC delegation event") + s.processNewBTCDelegationEvent(ctx, bbnEvent) + case EventBTCDelegationStateUpdate: + log.Debug().Msg("Processing BTC delegation state update event") + s.processBTCDelegationStateUpdateEvent(ctx, bbnEvent) + case EventCovenantQuorumReached: + log.Debug().Msg("Processing covenant quorum reached event") + s.processCovenantQuorumReachedEvent(ctx, bbnEvent) + case EventBTCDelegationInclusionProofReceived: + log.Debug().Msg("Processing BTC delegation inclusion proof received event") + s.processBTCDelegationInclusionProofReceivedEvent(ctx, bbnEvent) + case EventBTCDelgationUnbondedEarly: + log.Debug().Msg("Processing BTC delegation unbonded early event") + s.processBTCDelegationUnbondedEarlyEvent(ctx, bbnEvent) + case EventBTCDelegationExpired: + log.Debug().Msg("Processing BTC delegation expired event") + s.processBTCDelegationExpiredEvent(ctx, bbnEvent) } } @@ -118,56 +133,15 @@ func parseEvent[T proto.Message]( evtType := proto.MessageName(parsedEvent) log.Debug().Str("event_type", evtType).Msg("parsed event type") - // Check if the parsed event is of the expected type - // if reflect.TypeOf(parsedEvent) != reflect.TypeOf(result) { - // return nil, types.NewError( - // http.StatusInternalServerError, - // types.InternalServiceError, - // fmt.Errorf("parsed event type %T does not match expected type %T", parsedEvent, result), - // ) - // } - - // Create a map to store the attributes - // attributeMap := make(map[string]string) - - // // Populate the attribute map from the event's attributes - // for _, attr := range event.Attributes { - // // Unescape the attribute value - // attributeMap[attr.Key] = utils.SafeUnescape(attr.Value) - // } - - // log.Debug().Interface("attributeMap", attributeMap).Msg("attributeMap") - - // Marshal the attributeMap into JSON - // attrJSON, err := json.Marshal(attributeMap) - // if err != nil { - // return nil, types.NewError( - // http.StatusInternalServerError, - // types.InternalServiceError, - // fmt.Errorf("failed to marshal attributes into JSON: %w", err), - // ) - // } - - // Unmarshal the JSON into the T struct - // var evt T - // err = json.Unmarshal(attrJSON, &evt) - // if err != nil { - // return nil, types.NewError( - // http.StatusInternalServerError, - // types.InternalServiceError, - // fmt.Errorf("failed to unmarshal attributes into %T: %w", evt, err), - // ) - // } - - // Type assert the parsed event to the expected type - result, ok := parsedEvent.(T) + // Type assertion to ensure we have the correct concrete type + concreteEvent, ok := parsedEvent.(T) if !ok { return result, types.NewError( http.StatusInternalServerError, types.InternalServiceError, - fmt.Errorf("failed to assert parsed event to type %T", result), + fmt.Errorf("parsed event type %T does not match expected type %T", parsedEvent, result), ) } - return result, nil + return concreteEvent, nil } From 27a54a6f7d3a9d82d2908dbf1c1aeee5043bd60b Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 12:13:30 -0400 Subject: [PATCH 06/12] update logs --- internal/db/params.go | 17 +---------------- internal/services/global-params.go | 10 ---------- internal/utils/poller/poller.go | 8 ++------ 3 files changed, 3 insertions(+), 32 deletions(-) diff --git a/internal/db/params.go b/internal/db/params.go index 2c88884..c66ac98 100644 --- a/internal/db/params.go +++ b/internal/db/params.go @@ -6,7 +6,6 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" - "github.com/rs/zerolog/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -50,8 +49,6 @@ func (db *Database) SaveStakingParams( func (db *Database) SaveCheckpointParams( ctx context.Context, params *bbnclient.CheckpointParams, ) error { - log.Debug().Msg("Saving checkpoint params") - collection := db.client.Database(db.dbName). Collection(model.GlobalParamsCollection) @@ -68,22 +65,10 @@ func (db *Database) SaveCheckpointParams( }, } - log.Debug(). - Interface("filter", filter). - Interface("update", update). - Msg("Attempting to update checkpoint params") - - result, err := collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) + _, err := collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) if err != nil { - log.Error().Err(err).Msg("Failed to save checkpoint params") return fmt.Errorf("failed to save checkpoint params: %w", err) } - log.Info(). - Int64("matched_count", result.MatchedCount). - Int64("modified_count", result.ModifiedCount). - Int64("upserted_count", result.UpsertedCount). - Msg("Successfully saved checkpoint params") - return nil } diff --git a/internal/services/global-params.go b/internal/services/global-params.go index 2b2d477..4fb0157 100644 --- a/internal/services/global-params.go +++ b/internal/services/global-params.go @@ -18,8 +18,6 @@ func (s *Service) SyncGlobalParams(ctx context.Context) { } func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { - log.Debug().Msg("Fetching and saving global parameters") - checkpointParams, err := s.bbn.GetCheckpointParams(ctx) if err != nil { // TODO: Add metrics and replace internal service error with a more specific @@ -29,15 +27,12 @@ func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { fmt.Errorf("failed to get checkpoint params: %w", err), ) } - log.Debug().Interface("checkpointParams", checkpointParams).Msg("Retrieved checkpoint params") - if err := s.db.SaveCheckpointParams(ctx, checkpointParams); err != nil { log.Error().Err(err).Msg("Failed to save checkpoint params") return types.NewInternalServiceError( fmt.Errorf("failed to save checkpoint params: %w", err), ) } - log.Info().Msg("Successfully saved checkpoint params") allStakingParams, err := s.bbn.GetAllStakingParams(ctx) if err != nil { @@ -46,24 +41,19 @@ func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { fmt.Errorf("failed to get staking params: %w", err), ) } - log.Debug().Interface("allStakingParams", allStakingParams).Msg("Retrieved all staking params") for version, params := range allStakingParams { if params == nil { - log.Error().Uint32("version", version).Msg("Nil staking params encountered") return types.NewInternalServiceError( fmt.Errorf("nil staking params for version %d", version), ) } if err := s.db.SaveStakingParams(ctx, version, params); err != nil { - log.Error().Err(err).Uint32("version", version).Msg("Failed to save staking params") return types.NewInternalServiceError( fmt.Errorf("failed to save staking params: %w", err), ) } - log.Info().Uint32("version", version).Msg("Successfully saved staking params") } - log.Info().Msg("Successfully fetched and saved all global parameters") return nil } diff --git a/internal/utils/poller/poller.go b/internal/utils/poller/poller.go index 03f02bf..f17aee6 100644 --- a/internal/utils/poller/poller.go +++ b/internal/utils/poller/poller.go @@ -24,24 +24,20 @@ func NewPoller(interval time.Duration, pollMethod func(ctx context.Context) *typ func (p *Poller) Start(ctx context.Context) { ticker := time.NewTicker(p.interval) - defer ticker.Stop() - - log.Info().Msgf("Starting poller with interval %s", p.interval) for { select { case <-ticker.C: - log.Debug().Msg("Executing poll method") if err := p.pollMethod(ctx); err != nil { log.Error().Err(err).Msg("Error polling") - } else { - log.Debug().Msg("Poll method executed successfully") } case <-ctx.Done(): + // Handle context cancellation. log.Info().Msg("Poller stopped due to context cancellation") return case <-p.quit: log.Info().Msg("Poller stopped") + ticker.Stop() // Stop the ticker return } } From a939eaee1b31870c8a1abbb70ac607e1746303d0 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 12:16:57 -0400 Subject: [PATCH 07/12] fix logs --- internal/services/global-params.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/services/global-params.go b/internal/services/global-params.go index 4fb0157..717c7d6 100644 --- a/internal/services/global-params.go +++ b/internal/services/global-params.go @@ -6,7 +6,6 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/babylonlabs-io/babylon-staking-indexer/internal/utils/poller" - "github.com/rs/zerolog/log" ) func (s *Service) SyncGlobalParams(ctx context.Context) { @@ -22,13 +21,11 @@ func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { if err != nil { // TODO: Add metrics and replace internal service error with a more specific // error code so that the poller can catch and emit the error metrics - log.Error().Err(err).Msg("Failed to get checkpoint params") return types.NewInternalServiceError( fmt.Errorf("failed to get checkpoint params: %w", err), ) } if err := s.db.SaveCheckpointParams(ctx, checkpointParams); err != nil { - log.Error().Err(err).Msg("Failed to save checkpoint params") return types.NewInternalServiceError( fmt.Errorf("failed to save checkpoint params: %w", err), ) @@ -36,7 +33,6 @@ func (s *Service) fetchAndSaveParams(ctx context.Context) *types.Error { allStakingParams, err := s.bbn.GetAllStakingParams(ctx) if err != nil { - log.Error().Err(err).Msg("Failed to get staking params") return types.NewInternalServiceError( fmt.Errorf("failed to get staking params: %w", err), ) From 78708edb484741ce5ab75b59e5f5f11298c8946a Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 12:18:03 -0400 Subject: [PATCH 08/12] fix mocks --- tests/mocks/mock_db_client.go | 50 +++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 94805e4..8b54bf7 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -7,6 +7,8 @@ import ( bbnclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" + internaltypes "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + mock "github.com/stretchr/testify/mock" model "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" @@ -19,6 +21,36 @@ type DbInterface struct { mock.Mock } +// GetBTCDelegationByStakingTxHash provides a mock function with given fields: ctx, stakingTxHash +func (_m *DbInterface) GetBTCDelegationByStakingTxHash(ctx context.Context, stakingTxHash string) (*model.BTCDelegationDetails, error) { + ret := _m.Called(ctx, stakingTxHash) + + if len(ret) == 0 { + panic("no return value specified for GetBTCDelegationByStakingTxHash") + } + + var r0 *model.BTCDelegationDetails + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*model.BTCDelegationDetails, error)); ok { + return rf(ctx, stakingTxHash) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *model.BTCDelegationDetails); ok { + r0 = rf(ctx, stakingTxHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.BTCDelegationDetails) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, stakingTxHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetFinalityProviderByBtcPk provides a mock function with given fields: ctx, btcPk func (_m *DbInterface) GetFinalityProviderByBtcPk(ctx context.Context, btcPk string) (*model.FinalityProviderDetails, error) { ret := _m.Called(ctx, btcPk) @@ -139,6 +171,24 @@ func (_m *DbInterface) SaveStakingParams(ctx context.Context, version uint32, pa return r0 } +// UpdateBTCDelegationState provides a mock function with given fields: ctx, stakingTxHash, newState +func (_m *DbInterface) UpdateBTCDelegationState(ctx context.Context, stakingTxHash string, newState internaltypes.DelegationState) error { + ret := _m.Called(ctx, stakingTxHash, newState) + + if len(ret) == 0 { + panic("no return value specified for UpdateBTCDelegationState") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, internaltypes.DelegationState) error); ok { + r0 = rf(ctx, stakingTxHash, newState) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateFinalityProviderDetailsFromEvent provides a mock function with given fields: ctx, detailsToUpdate func (_m *DbInterface) UpdateFinalityProviderDetailsFromEvent(ctx context.Context, detailsToUpdate *model.FinalityProviderDetails) error { ret := _m.Called(ctx, detailsToUpdate) From f190bdb4e9daaf1bd155bbe8d8714f9cee87645e Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 12:22:25 -0400 Subject: [PATCH 09/12] fix db funcs --- internal/db/delegation.go | 19 ++++++++++++++++++- internal/types/state.go | 4 ++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/db/delegation.go b/internal/db/delegation.go index 62923d9..d5e5fbe 100644 --- a/internal/db/delegation.go +++ b/internal/db/delegation.go @@ -35,6 +35,23 @@ func (db *Database) SaveNewBTCDelegation( func (db *Database) UpdateBTCDelegationState( ctx context.Context, stakingTxHash string, newState types.DelegationState, ) error { + filter := map[string]interface{}{"_id": stakingTxHash} + update := map[string]interface{}{"$set": map[string]string{"state": newState.String()}} + + res := db.client.Database(db.dbName). + Collection(model.BTCDelegationDetailsCollection). + FindOneAndUpdate(ctx, filter, update) + + if res.Err() != nil { + if errors.Is(res.Err(), mongo.ErrNoDocuments) { + return &NotFoundError{ + Key: stakingTxHash, + Message: "BTC delegation not found when updating state", + } + } + return res.Err() + } + return nil } @@ -52,7 +69,7 @@ func (db *Database) GetBTCDelegationByStakingTxHash( if errors.Is(err, mongo.ErrNoDocuments) { return nil, &NotFoundError{ Key: stakingTxHash, - Message: "BTC delegation not found", + Message: "BTC delegation not found when getting by staking tx hash", } } return nil, err diff --git a/internal/types/state.go b/internal/types/state.go index f9c756d..be6d35a 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -11,3 +11,7 @@ const ( StateSlashed DelegationState = "SLASHED" StateUnbonded DelegationState = "UNBONDED" ) + +func (s DelegationState) String() string { + return string(s) +} From a3d8a78a0937ab1568dadc7fb2fb8d5ac376a455 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 13:28:31 -0400 Subject: [PATCH 10/12] fix msgs --- internal/services/events.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/internal/services/events.go b/internal/services/events.go index 965e236..ff69b1a 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -93,6 +93,7 @@ func parseEvent[T proto.Message]( ) (T, *types.Error) { var result T + // Check if the event type matches the expected type if EventTypes(event.Type) != expectedType { return result, types.NewErrorWithMsg( http.StatusInternalServerError, @@ -104,6 +105,8 @@ func parseEvent[T proto.Message]( ), ) } + + // Check if the event has attributes if len(event.Attributes) == 0 { return result, types.NewErrorWithMsg( http.StatusInternalServerError, @@ -116,7 +119,7 @@ func parseEvent[T proto.Message]( } // Use the SDK's ParseTypedEvent function - parsedEvent, err := sdk.ParseTypedEvent(event) + protoMsg, err := sdk.ParseTypedEvent(event) if err != nil { return result, types.NewError( http.StatusInternalServerError, @@ -125,23 +128,15 @@ func parseEvent[T proto.Message]( ) } - // Log the parsed event - log.Debug(). - Interface("parsed_event", parsedEvent). - Msg("Parsed event details") - - evtType := proto.MessageName(parsedEvent) - log.Debug().Str("event_type", evtType).Msg("parsed event type") - // Type assertion to ensure we have the correct concrete type - concreteEvent, ok := parsedEvent.(T) + concreteMsg, ok := protoMsg.(T) if !ok { return result, types.NewError( http.StatusInternalServerError, types.InternalServiceError, - fmt.Errorf("parsed event type %T does not match expected type %T", parsedEvent, result), + fmt.Errorf("parsed event type %T does not match expected type %T", protoMsg, result), ) } - return concreteEvent, nil + return concreteMsg, nil } From afa1528fa58d14335dd17da8596206f782c55e31 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Sun, 20 Oct 2024 14:23:24 -0400 Subject: [PATCH 11/12] add verified type --- internal/types/state.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/types/state.go b/internal/types/state.go index be6d35a..cf5b645 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -5,6 +5,7 @@ type DelegationState string const ( StatePending DelegationState = "PENDING" + StateVerified DelegationState = "VERIFIED" StateActive DelegationState = "ACTIVE" StateUnbonding DelegationState = "UNBONDING" StateWithdrawn DelegationState = "WITHDRAWN" From 2c164bfed2650d7ba76fb4389bab96dd4fd50e0a Mon Sep 17 00:00:00 2001 From: Gurjot Date: Mon, 21 Oct 2024 10:51:34 -0400 Subject: [PATCH 12/12] pr comment: remove networks from compose --- docker-compose.yml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 6303b43..a41174e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,6 @@ services: - rabbitmq volumes: - ./config/config-docker.yml:/home/babylon-staking-indexer/config.yml:Z - networks: - - indexer-network indexer-mongodb: image: mongo:latest container_name: indexer-mongodb @@ -24,8 +22,6 @@ services: volumes: - ./bin/init-mongo.sh:/init-mongo.sh entrypoint: [ "/init-mongo.sh" ] - networks: - - indexer-network rabbitmq: image: rabbitmq:3-management container_name: rabbitmq @@ -37,9 +33,3 @@ services: RABBITMQ_DEFAULT_PASS: password volumes: - "./rabbitmq_data:/var/lib/rabbitmq" - networks: - - indexer-network - -networks: - indexer-network: - driver: bridge