From fe6a3250ca7ebcfe3fb9d37ed09b310754702eee Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 6 Dec 2024 14:48:00 +0530 Subject: [PATCH] fix --- internal/services/consumer_events.go | 16 ++-- internal/services/delegation.go | 109 ++++++++++++++++-------- internal/services/delegation_helpers.go | 23 +++-- internal/services/events.go | 4 + internal/services/subscription.go | 8 +- internal/services/watch_btc_events.go | 15 ++-- 6 files changed, 116 insertions(+), 59 deletions(-) diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 1fa67b3..84790b1 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -9,12 +9,18 @@ import ( queuecli "github.com/babylonlabs-io/staking-queue-client/client" ) -func (s *Service) emitActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { +func (s *Service) emitActiveDelegationEvent( + ctx context.Context, + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) *types.Error { stakingEvent := queuecli.NewActiveStakingEvent( - delegation.StakingTxHashHex, - delegation.StakerBtcPkHex, - delegation.FinalityProviderBtcPksHex, - delegation.StakingAmount, + stakingTxHashHex, + stakerBtcPkHex, + finalityProviderBtcPksHex, + stakingAmount, ) if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil { diff --git a/internal/services/delegation.go b/internal/services/delegation.go index e1e9dd0..ce61238 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -140,22 +140,6 @@ func (s *Service) processCovenantQuorumReachedEvent( return nil } - // Update delegation state - newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - if dbErr := s.db.UpdateBTCDelegationState( - ctx, - covenantQuorumReachedEvent.StakingTxHash, - types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState), - newState, - nil, - ); dbErr != nil { - return types.NewError( - http.StatusInternalServerError, - types.InternalServiceError, - fmt.Errorf("failed to update BTC delegation state: %w", dbErr), - ) - } - // Emit event and register spend notification delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash) if dbErr != nil { @@ -165,18 +149,52 @@ func (s *Service) processCovenantQuorumReachedEvent( fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr), ) } + + newState := types.DelegationState(covenantQuorumReachedEvent.NewState) if newState == types.StateActive { - err = s.emitActiveDelegationEvent(ctx, delegation) + log.Debug(). + Str("staking_tx", covenantQuorumReachedEvent.StakingTxHash). + Str("staking_start_height", strconv.FormatUint(uint64(delegation.StartHeight), 10)). + Str("event_type", EventCovenantQuorumReached.String()). + Msg("handling active state") + + err = s.emitActiveDelegationEvent( + ctx, + delegation.StakingTxHashHex, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, + ) if err != nil { return err } - // Register spend notification - if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + if err := s.registerStakingSpendNotification( + ctx, + delegation.StakingTxHashHex, + delegation.StakingTxHex, + delegation.StakingOutputIdx, + delegation.StartHeight, + ); err != nil { return err } } + // Update delegation state + if dbErr := s.db.UpdateBTCDelegationState( + ctx, + covenantQuorumReachedEvent.StakingTxHash, + types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState), + newState, + nil, + ); dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation state: %w", dbErr), + ) + } + return nil } @@ -199,19 +217,6 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( return nil } - // Update delegation details - if dbErr := s.db.UpdateBTCDelegationDetails( - ctx, - inclusionProofEvent.StakingTxHash, - model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent), - ); dbErr != nil { - return types.NewError( - http.StatusInternalServerError, - types.InternalServiceError, - fmt.Errorf("failed to update BTC delegation details: %w", dbErr), - ) - } - // Emit event and register spend notification delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash) if dbErr != nil { @@ -223,17 +228,48 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( } newState := types.DelegationState(inclusionProofEvent.NewState) if newState == types.StateActive { - err = s.emitActiveDelegationEvent(ctx, delegation) + stakingStartHeight, _ := strconv.ParseUint(inclusionProofEvent.StartHeight, 10, 32) + + log.Debug(). + Str("staking_tx", inclusionProofEvent.StakingTxHash). + Str("staking_start_height", inclusionProofEvent.StartHeight). + Str("event_type", EventBTCDelegationInclusionProofReceived.String()). + Msg("handling active state") + + err = s.emitActiveDelegationEvent( + ctx, + inclusionProofEvent.StakingTxHash, + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, + ) if err != nil { return err } - // Register spend notification - if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + if err := s.registerStakingSpendNotification(ctx, + delegation.StakingTxHashHex, + delegation.StakingTxHex, + delegation.StakingOutputIdx, + uint32(stakingStartHeight), + ); err != nil { return err } } + // Update delegation details + if dbErr := s.db.UpdateBTCDelegationDetails( + ctx, + inclusionProofEvent.StakingTxHash, + model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent), + ); dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update BTC delegation details: %w", dbErr), + ) + } + return nil } @@ -304,7 +340,8 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent( Str("unbonding_time", strconv.FormatUint(uint64(delegation.UnbondingTime), 10)). Str("unbonding_expire_height", strconv.FormatUint(uint64(unbondingExpireHeight), 10)). Str("sub_state", subState.String()). - Msg("updating delegation state to early unbonding") + Str("event_type", EventBTCDelgationUnbondedEarly.String()). + Msg("updating delegation state") // Update delegation state if err := s.db.UpdateBTCDelegationState( diff --git a/internal/services/delegation_helpers.go b/internal/services/delegation_helpers.go index 1619197..f6a7e62 100644 --- a/internal/services/delegation_helpers.go +++ b/internal/services/delegation_helpers.go @@ -68,9 +68,12 @@ func (s *Service) registerUnbondingSpendNotification( func (s *Service) registerStakingSpendNotification( ctx context.Context, - delegation *model.BTCDelegationDetails, + stakingTxHashHex string, + stakingTxHex string, + stakingOutputIdx uint32, + stakingStartHeight uint32, ) *types.Error { - stakingTxHash, err := chainhash.NewHashFromStr(delegation.StakingTxHashHex) + stakingTxHash, err := chainhash.NewHashFromStr(stakingTxHashHex) if err != nil { return types.NewError( http.StatusInternalServerError, @@ -79,7 +82,7 @@ func (s *Service) registerStakingSpendNotification( ) } - stakingTx, err := utils.DeserializeBtcTransactionFromHex(delegation.StakingTxHex) + stakingTx, err := utils.DeserializeBtcTransactionFromHex(stakingTxHex) if err != nil { return types.NewError( http.StatusInternalServerError, @@ -88,30 +91,26 @@ func (s *Service) registerStakingSpendNotification( ) } - log.Debug(). - Str("staking_tx", delegation.StakingTxHashHex). - Msg("registering staking spend notification") - stakingOutpoint := wire.OutPoint{ Hash: *stakingTxHash, - Index: delegation.StakingOutputIdx, + Index: stakingOutputIdx, } spendEv, err := s.btcNotifier.RegisterSpendNtfn( &stakingOutpoint, - stakingTx.TxOut[delegation.StakingOutputIdx].PkScript, - delegation.StartHeight, + stakingTx.TxOut[stakingOutputIdx].PkScript, + stakingStartHeight, ) if err != nil { return types.NewError( http.StatusInternalServerError, types.InternalServiceError, - fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", delegation.StakingTxHashHex, err), + fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", stakingTxHashHex, err), ) } s.wg.Add(1) - go s.watchForSpendStakingTx(spendEv, delegation) + go s.watchForSpendStakingTx(spendEv, stakingTxHashHex) return nil } diff --git a/internal/services/events.go b/internal/services/events.go index e1733ed..d291ae3 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -21,6 +21,10 @@ type EventTypes string type EventCategory string +func (e EventTypes) String() string { + return string(e) +} + const ( BlockCategory EventCategory = "block" TxCategory EventCategory = "tx" diff --git a/internal/services/subscription.go b/internal/services/subscription.go index e601683..749f8e7 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -62,7 +62,13 @@ func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) { for _, delegation := range delegations { // Register spend notification - if err := s.registerStakingSpendNotification(ctx, delegation); err != nil { + if err := s.registerStakingSpendNotification( + ctx, + delegation.StakingTxHashHex, + delegation.StakingTxHex, + delegation.StakingOutputIdx, + delegation.StartHeight, + ); err != nil { log.Fatal().Msgf("Failed to register spend notification: %v", err) } } diff --git a/internal/services/watch_btc_events.go b/internal/services/watch_btc_events.go index 010a194..0e1e2e5 100644 --- a/internal/services/watch_btc_events.go +++ b/internal/services/watch_btc_events.go @@ -22,7 +22,7 @@ import ( func (s *Service) watchForSpendStakingTx( spendEvent *notifier.SpendEvent, - delegation *model.BTCDelegationDetails, + stakingTxHashHex string, ) { defer s.wg.Done() quitCtx, cancel := s.quitContext() @@ -32,7 +32,7 @@ func (s *Service) watchForSpendStakingTx( select { case spendDetail := <-spendEvent.Spend: log.Debug(). - Str("staking_tx", delegation.StakingTxHashHex). + Str("staking_tx", stakingTxHashHex). Str("spending_tx", spendDetail.SpendingTx.TxHash().String()). Msg("staking tx has been spent") if err := s.handleSpendingStakingTransaction( @@ -40,11 +40,11 @@ func (s *Service) watchForSpendStakingTx( spendDetail.SpendingTx, spendDetail.SpenderInputIndex, uint32(spendDetail.SpendingHeight), - delegation, + stakingTxHashHex, ); err != nil { log.Error(). Err(err). - Str("staking_tx", delegation.StakingTxHashHex). + Str("staking_tx", stakingTxHashHex). Str("spending_tx", spendDetail.SpendingTx.TxHash().String()). Msg("failed to handle spending staking transaction") return @@ -158,8 +158,13 @@ func (s *Service) handleSpendingStakingTransaction( spendingTx *wire.MsgTx, spendingInputIdx uint32, spendingHeight uint32, - delegation *model.BTCDelegationDetails, + stakingTxHashHex string, ) error { + delegation, err := s.db.GetBTCDelegationByStakingTxHash(ctx, stakingTxHashHex) + if err != nil { + return fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", err) + } + params, err := s.db.GetStakingParams(ctx, delegation.ParamsVersion) if err != nil { return fmt.Errorf("failed to get staking params: %w", err)