Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update db after processing event #89

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions internal/services/consumer_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ import (
queuecli "github.com/babylonlabs-io/staking-queue-client/client"
)

func (s *Service) emitActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error {
func (s *Service) emitActiveDelegationEvent(
ctx context.Context,
stakingTxHashHex string,
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
) *types.Error {
stakingEvent := queuecli.NewActiveStakingEvent(
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
stakingTxHashHex,
stakerBtcPkHex,
finalityProviderBtcPksHex,
stakingAmount,
)

if err := s.queueManager.PushActiveStakingEvent(&stakingEvent); err != nil {
Expand Down
109 changes: 73 additions & 36 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,6 @@ func (s *Service) processCovenantQuorumReachedEvent(
return nil
}

// Update delegation state
newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
if dbErr := s.db.UpdateBTCDelegationState(
ctx,
covenantQuorumReachedEvent.StakingTxHash,
types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState),
newState,
nil,
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation state: %w", dbErr),
)
}

// Emit event and register spend notification
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, covenantQuorumReachedEvent.StakingTxHash)
if dbErr != nil {
Expand All @@ -165,18 +149,52 @@ func (s *Service) processCovenantQuorumReachedEvent(
fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", dbErr),
)
}

newState := types.DelegationState(covenantQuorumReachedEvent.NewState)
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
log.Debug().
Str("staking_tx", covenantQuorumReachedEvent.StakingTxHash).
Str("staking_start_height", strconv.FormatUint(uint64(delegation.StartHeight), 10)).
Str("event_type", EventCovenantQuorumReached.String()).
Msg("handling active state")

err = s.emitActiveDelegationEvent(
ctx,
delegation.StakingTxHashHex,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(
ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
delegation.StartHeight,
); err != nil {
return err
}
}

// Update delegation state
if dbErr := s.db.UpdateBTCDelegationState(
ctx,
covenantQuorumReachedEvent.StakingTxHash,
types.QualifiedStatesForCovenantQuorumReached(covenantQuorumReachedEvent.NewState),
newState,
nil,
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation state: %w", dbErr),
)
}

return nil
}

Expand All @@ -199,19 +217,6 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
return nil
}

// Update delegation details
if dbErr := s.db.UpdateBTCDelegationDetails(
ctx,
inclusionProofEvent.StakingTxHash,
model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent),
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation details: %w", dbErr),
)
}

// Emit event and register spend notification
delegation, dbErr := s.db.GetBTCDelegationByStakingTxHash(ctx, inclusionProofEvent.StakingTxHash)
if dbErr != nil {
Expand All @@ -223,17 +228,48 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent(
}
newState := types.DelegationState(inclusionProofEvent.NewState)
if newState == types.StateActive {
err = s.emitActiveDelegationEvent(ctx, delegation)
stakingStartHeight, _ := strconv.ParseUint(inclusionProofEvent.StartHeight, 10, 32)

log.Debug().
Str("staking_tx", inclusionProofEvent.StakingTxHash).
Str("staking_start_height", inclusionProofEvent.StartHeight).
Str("event_type", EventBTCDelegationInclusionProofReceived.String()).
Msg("handling active state")

err = s.emitActiveDelegationEvent(
ctx,
inclusionProofEvent.StakingTxHash,
delegation.StakerBtcPkHex,
delegation.FinalityProviderBtcPksHex,
delegation.StakingAmount,
)
if err != nil {
return err
}

// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
uint32(stakingStartHeight),
); err != nil {
return err
}
}

// Update delegation details
if dbErr := s.db.UpdateBTCDelegationDetails(
ctx,
inclusionProofEvent.StakingTxHash,
model.FromEventBTCDelegationInclusionProofReceived(inclusionProofEvent),
); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update BTC delegation details: %w", dbErr),
)
}

return nil
}

Expand Down Expand Up @@ -304,7 +340,8 @@ func (s *Service) processBTCDelegationUnbondedEarlyEvent(
Str("unbonding_time", strconv.FormatUint(uint64(delegation.UnbondingTime), 10)).
Str("unbonding_expire_height", strconv.FormatUint(uint64(unbondingExpireHeight), 10)).
Str("sub_state", subState.String()).
Msg("updating delegation state to early unbonding")
Str("event_type", EventBTCDelgationUnbondedEarly.String()).
Msg("updating delegation state")

// Update delegation state
if err := s.db.UpdateBTCDelegationState(
Expand Down
23 changes: 11 additions & 12 deletions internal/services/delegation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ func (s *Service) registerUnbondingSpendNotification(

func (s *Service) registerStakingSpendNotification(
ctx context.Context,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
stakingTxHex string,
stakingOutputIdx uint32,
stakingStartHeight uint32,
) *types.Error {
stakingTxHash, err := chainhash.NewHashFromStr(delegation.StakingTxHashHex)
stakingTxHash, err := chainhash.NewHashFromStr(stakingTxHashHex)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
Expand All @@ -79,7 +82,7 @@ func (s *Service) registerStakingSpendNotification(
)
}

stakingTx, err := utils.DeserializeBtcTransactionFromHex(delegation.StakingTxHex)
stakingTx, err := utils.DeserializeBtcTransactionFromHex(stakingTxHex)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
Expand All @@ -88,30 +91,26 @@ func (s *Service) registerStakingSpendNotification(
)
}

log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Msg("registering staking spend notification")

stakingOutpoint := wire.OutPoint{
Hash: *stakingTxHash,
Index: delegation.StakingOutputIdx,
Index: stakingOutputIdx,
}

spendEv, err := s.btcNotifier.RegisterSpendNtfn(
&stakingOutpoint,
stakingTx.TxOut[delegation.StakingOutputIdx].PkScript,
delegation.StartHeight,
stakingTx.TxOut[stakingOutputIdx].PkScript,
stakingStartHeight,
)
if err != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", delegation.StakingTxHashHex, err),
fmt.Errorf("failed to register spend ntfn for staking tx %s: %w", stakingTxHashHex, err),
)
}

s.wg.Add(1)
go s.watchForSpendStakingTx(spendEv, delegation)
go s.watchForSpendStakingTx(spendEv, stakingTxHashHex)

return nil
}
4 changes: 4 additions & 0 deletions internal/services/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type EventTypes string

type EventCategory string

func (e EventTypes) String() string {
return string(e)
}

const (
BlockCategory EventCategory = "block"
TxCategory EventCategory = "tx"
Expand Down
8 changes: 7 additions & 1 deletion internal/services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ func (s *Service) ResubscribeToMissedBtcNotifications(ctx context.Context) {

for _, delegation := range delegations {
// Register spend notification
if err := s.registerStakingSpendNotification(ctx, delegation); err != nil {
if err := s.registerStakingSpendNotification(
ctx,
delegation.StakingTxHashHex,
delegation.StakingTxHex,
delegation.StakingOutputIdx,
delegation.StartHeight,
); err != nil {
log.Fatal().Msgf("Failed to register spend notification: %v", err)
}
}
Expand Down
15 changes: 10 additions & 5 deletions internal/services/watch_btc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func (s *Service) watchForSpendStakingTx(
spendEvent *notifier.SpendEvent,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
) {
defer s.wg.Done()
quitCtx, cancel := s.quitContext()
Expand All @@ -32,19 +32,19 @@ func (s *Service) watchForSpendStakingTx(
select {
case spendDetail := <-spendEvent.Spend:
log.Debug().
Str("staking_tx", delegation.StakingTxHashHex).
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Msg("staking tx has been spent")
if err := s.handleSpendingStakingTransaction(
quitCtx,
spendDetail.SpendingTx,
spendDetail.SpenderInputIndex,
uint32(spendDetail.SpendingHeight),
delegation,
stakingTxHashHex,
); err != nil {
log.Error().
Err(err).
Str("staking_tx", delegation.StakingTxHashHex).
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Msg("failed to handle spending staking transaction")
return
Expand Down Expand Up @@ -158,8 +158,13 @@ func (s *Service) handleSpendingStakingTransaction(
spendingTx *wire.MsgTx,
spendingInputIdx uint32,
spendingHeight uint32,
delegation *model.BTCDelegationDetails,
stakingTxHashHex string,
) error {
delegation, err := s.db.GetBTCDelegationByStakingTxHash(ctx, stakingTxHashHex)
if err != nil {
return fmt.Errorf("failed to get BTC delegation by staking tx hash: %w", err)
}

params, err := s.db.GetStakingParams(ctx, delegation.ParamsVersion)
if err != nil {
return fmt.Errorf("failed to get staking params: %w", err)
Expand Down
Loading