Skip to content

Commit

Permalink
rm in channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Lazar955 committed Dec 18, 2024
1 parent 2746650 commit 43402c6
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions btcstaking-tracker/stakingeventwatcher/stakingeventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type StakingEventWatcher struct {

unbondingDelegationChan chan *newDelegation
unbondingRemovalChan chan *delegationInactive
pendingRemovalChan chan *delegationInactive
currentBestBlockHeight atomic.Uint32
activationLimiter *semaphore.Weighted
}
Expand All @@ -109,6 +110,7 @@ func NewStakingEventWatcher(
inProgressTracker: NewTrackedDelegations(),
unbondingDelegationChan: make(chan *newDelegation),
unbondingRemovalChan: make(chan *delegationInactive),
pendingRemovalChan: make(chan *delegationInactive, 1000),
activationLimiter: semaphore.NewWeighted(maxConcurrentActivations), // todo(lazar): this should be in config
}
}
Expand Down Expand Up @@ -140,11 +142,12 @@ func (sew *StakingEventWatcher) Start() error {

sew.logger.Infof("Initial btc best block height is: %d", sew.currentBestBlockHeight.Load())

sew.wg.Add(4)
sew.wg.Add(5)
go sew.handleNewBlocks(blockEventNotifier)
go sew.handleUnbondedDelegations()
go sew.fetchDelegations()
go sew.handlerVerifiedDelegations()
go sew.handlerRemovalFromTracker()

sew.logger.Info("staking event watcher started")
})
Expand Down Expand Up @@ -705,9 +708,12 @@ func (sew *StakingEventWatcher) activateBtcDelegation(

sew.metrics.ReportedActivateDelegationsCounter.Inc()

sew.pendingTracker.RemoveDelegation(stakingTxHash)
utils.PushOrQuit[*delegationInactive](
sew.pendingRemovalChan,
&delegationInactive{stakingTxHash: stakingTxHash},
sew.quit,
)

sew.metrics.NumberOfVerifiedDelegations.Dec()
sew.logger.Debugf("staking tx activated %s", stakingTxHash.String())

return nil
Expand Down Expand Up @@ -765,6 +771,22 @@ func (sew *StakingEventWatcher) waitForRequiredDepth(
)
}

func (sew *StakingEventWatcher) handlerRemovalFromTracker() {
defer sew.wg.Done()
for {
select {
case in := <-sew.pendingRemovalChan:
sew.logger.Debugf("Delegation with hash (%s) is verified, removing from tracker", in.stakingTxHash)
sew.pendingTracker.RemoveDelegation(in.stakingTxHash)
sew.metrics.NumberOfVerifiedDelegations.Dec()
case <-sew.quit:
sew.logger.Debug("handle delegations loop quit")

return
}
}
}

func (sew *StakingEventWatcher) latency(method string) func() {
startTime := time.Now()

Expand Down

0 comments on commit 43402c6

Please sign in to comment.