Skip to content

Commit

Permalink
fix e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
gitferry committed Dec 19, 2024
1 parent 757156e commit edb1eb0
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 88 deletions.
4 changes: 1 addition & 3 deletions eotsmanager/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ func (s *Server) RunUntilShutdown() error {
return fmt.Errorf("failed to listen on %s: %w", listenAddr, err)
}
defer func() {
if err := lis.Close(); err != nil {
s.logger.Error(fmt.Sprintf("Failed to close network listener: %v", err))
}
_ = lis.Close()
}()

grpcServer := grpc.NewServer()
Expand Down
10 changes: 4 additions & 6 deletions finality-provider/service/event_loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,12 @@ func (app *FinalityProviderApp) metricsUpdateLoop() {
defer updateTicker.Stop()

for {
if app.fpIns != nil {
app.metrics.UpdateFpMetrics(app.fpIns.GetStoreFinalityProvider())
}
select {
case <-updateTicker.C:
fps, err := app.fps.GetAllStoredFinalityProviders()
if err != nil {
app.logger.Error("failed to get finality-providers from the store", zap.Error(err))
continue
}
app.metrics.UpdateFpMetrics(fps)
continue
case <-app.quit:
app.logger.Info("exiting metrics update loop")
return
Expand Down
133 changes: 68 additions & 65 deletions finality-provider/service/fp_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,49 +171,52 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {
defer fp.wg.Done()

for {
// start submission in the first iteration
pollerBlocks := fp.getBatchBlocksFromChan()
if len(pollerBlocks) == 0 {
continue
}
targetHeight := pollerBlocks[len(pollerBlocks)-1].Height
fp.logger.Debug("the finality-provider received new block(s), start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
)

processedBlocks, err := fp.processBlocksToVote(pollerBlocks)
if err != nil {
fp.reportCriticalErr(err)
continue
}
select {
case <-time.After(fp.cfg.SignatureSubmissionInterval):
// start submission in the first iteration
pollerBlocks := fp.getBatchBlocksFromChan()
if len(pollerBlocks) == 0 {
continue
}
targetHeight := pollerBlocks[len(pollerBlocks)-1].Height
fp.logger.Debug("the finality-provider received new block(s), start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
)

res, err := fp.retrySubmitSigsUntilFinalized(processedBlocks)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
processedBlocks, err := fp.processBlocksToVote(pollerBlocks)
if err != nil {
fp.reportCriticalErr(err)
continue
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted the finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
zap.String("tx_hash", res.TxHash),
)
select {
case <-time.After(fp.cfg.SignatureSubmissionInterval):
continue

if len(processedBlocks) == 0 {
continue
}

res, err := fp.retrySubmitSigsUntilFinalized(processedBlocks)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted the finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
zap.String("tx_hash", res.TxHash),
)
case <-fp.quit:
fp.logger.Info("the finality signature submission loop is closing")
return
Expand Down Expand Up @@ -295,33 +298,32 @@ func (fp *FinalityProviderInstance) randomnessCommitmentLoop() {
defer fp.wg.Done()

for {
// start randomness commit in the first iteration
should, startHeight, err := fp.ShouldCommitRandomness()
if err != nil {
fp.reportCriticalErr(err)
continue
}
if !should {
continue
}

txRes, err := fp.CommitPubRand(startHeight)
if err != nil {
fp.metrics.IncrementFpTotalFailedRandomness(fp.GetBtcPkHex())
fp.reportCriticalErr(err)
continue
}
// txRes could be nil if no need to commit more randomness
if txRes != nil {
fp.logger.Info(
"successfully committed public randomness to the consumer chain",
zap.String("pk", fp.GetBtcPkHex()),
zap.String("tx_hash", txRes.TxHash),
)
}
select {
case <-time.After(fp.cfg.RandomnessCommitInterval):
continue
// start randomness commit in the first iteration
should, startHeight, err := fp.ShouldCommitRandomness()
if err != nil {
fp.reportCriticalErr(err)
continue
}
if !should {
continue
}

txRes, err := fp.CommitPubRand(startHeight)
if err != nil {
fp.metrics.IncrementFpTotalFailedRandomness(fp.GetBtcPkHex())
fp.reportCriticalErr(err)
continue
}
// txRes could be nil if no need to commit more randomness
if txRes != nil {
fp.logger.Info(
"successfully committed public randomness to the consumer chain",
zap.String("pk", fp.GetBtcPkHex()),
zap.String("tx_hash", txRes.TxHash),
)
}
case <-fp.quit:
fp.logger.Info("the randomness commitment loop is closing")
return
Expand Down Expand Up @@ -457,6 +459,7 @@ func (fp *FinalityProviderInstance) retrySubmitSigsUntilFinalized(targetBlocks [
select {
case <-time.After(fp.cfg.SubmissionRetryInterval):
// Continue to next retry iteration
continue
case <-fp.quit:
fp.logger.Debug("the finality-provider instance is closing", zap.String("pk", fp.GetBtcPkHex()))
return nil, ErrFinalityProviderShutDown
Expand Down
4 changes: 1 addition & 3 deletions finality-provider/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ func (s *Server) RunUntilShutdown() error {
return fmt.Errorf("failed to listen on %s: %w", listenAddr, err)
}
defer func() {
if err := lis.Close(); err != nil {
s.logger.Error(fmt.Sprintf("Failed to close network listener: %v", err))
}
_ = lis.Close()
}()

grpcServer := grpc.NewServer()
Expand Down
4 changes: 2 additions & 2 deletions itest/test_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ func StartManagerWithFinalityProvider(t *testing.T, n int) (*TestManager, []*ser
}

func (tm *TestManager) Stop(t *testing.T) {
for _, fp := range tm.Fps {
err := fp.Stop()
for _, fpApp := range tm.Fps {
err := fpApp.Stop()
require.NoError(t, err)
}
err := tm.manager.ClearResources()
Expand Down
16 changes: 7 additions & 9 deletions metrics/fp_collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,17 @@ func (fm *FpMetrics) RecordFpRandomnessTime(fpBtcPkHex string) {
fm.previousRandomnessByFp[fpBtcPkHex] = &now
}

func (fm *FpMetrics) UpdateFpMetrics(fps []*store.StoredFinalityProvider) {
func (fm *FpMetrics) UpdateFpMetrics(fp *store.StoredFinalityProvider) {
fm.mu.Lock()
defer fm.mu.Unlock()

for _, fp := range fps {
fm.RecordFpStatus(fp.GetBIP340BTCPK().MarshalHex(), fp.Status)
fm.RecordFpStatus(fp.GetBIP340BTCPK().MarshalHex(), fp.Status)

if lastVoteTime, ok := fm.previousVoteByFp[fp.GetBIP340BTCPK().MarshalHex()]; ok {
fm.RecordFpSecondsSinceLastVote(fp.GetBIP340BTCPK().MarshalHex(), time.Since(*lastVoteTime).Seconds())
}
if lastVoteTime, ok := fm.previousVoteByFp[fp.GetBIP340BTCPK().MarshalHex()]; ok {
fm.RecordFpSecondsSinceLastVote(fp.GetBIP340BTCPK().MarshalHex(), time.Since(*lastVoteTime).Seconds())
}

if lastRandomnessTime, ok := fm.previousRandomnessByFp[fp.GetBIP340BTCPK().MarshalHex()]; ok {
fm.RecordFpSecondsSinceLastRandomness(fp.GetBIP340BTCPK().MarshalHex(), time.Since(*lastRandomnessTime).Seconds())
}
if lastRandomnessTime, ok := fm.previousRandomnessByFp[fp.GetBIP340BTCPK().MarshalHex()]; ok {
fm.RecordFpSecondsSinceLastRandomness(fp.GetBIP340BTCPK().MarshalHex(), time.Since(*lastRandomnessTime).Seconds())
}
}

0 comments on commit edb1eb0

Please sign in to comment.