Skip to content

Commit

Permalink
fix: race condition in last processed height (#38)
Browse files Browse the repository at this point in the history
* fix: event processor race condition on last processed block
  • Loading branch information
gusin13 authored Nov 11, 2024
1 parent 6d1ed53 commit db9071b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 31 deletions.
7 changes: 5 additions & 2 deletions internal/db/error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package db

import "errors"
import (
"errors"
"fmt"
)

// DuplicateKeyError is an error type for duplicate key errors
type DuplicateKeyError struct {
Expand Down Expand Up @@ -41,7 +44,7 @@ type NotFoundError struct {
}

func (e *NotFoundError) Error() string {
return e.Message
return fmt.Sprintf("%s: %s", e.Message, e.Key)
}

func IsNotFoundError(err error) bool {
Expand Down
16 changes: 8 additions & 8 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ const (
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) StartBbnBlockProcessor(ctx context.Context) {
go func() {
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}()
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}

// processBlocksSequentially processes BBN blockchain blocks in sequential order,
Expand Down Expand Up @@ -63,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
}

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
for i := lastProcessedHeight; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
Expand All @@ -76,11 +74,13 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
if err != nil {
return err
}

for _, event := range events {
s.bbnEventProcessor <- event
if err := s.processEvent(ctx, event); err != nil {
return err
}
}

// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
Expand Down
21 changes: 4 additions & 17 deletions internal/services/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,8 @@ func NewBbnEvent(category EventCategory, event abcitypes.Event) BbnEvent {
}
}

// startBbnEventProcessor continuously listens for events from the channel and
// processes them in the main thread
func (s *Service) StartBbnEventProcessor(ctx context.Context) {
for event := range s.bbnEventProcessor {
if event.Event.Type == "" {
log.Warn().Msg("Empty event received, skipping")
continue
}
// Create a new context with a timeout for each event
ctx, cancel := context.WithTimeout(context.Background(), eventProcessingTimeout)
defer cancel()
s.processEvent(ctx, event)
}
}

// Entry point for processing events
func (s *Service) processEvent(ctx context.Context, event BbnEvent) {
func (s *Service) processEvent(ctx context.Context, event BbnEvent) *types.Error {
// Note: We no longer need to check for the event category here. We can directly
// process the event based on its type.
bbnEvent := event.Event
Expand Down Expand Up @@ -89,8 +74,10 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) {

if err != nil {
log.Error().Err(err).Msg("Failed to process event")
panic(err)
return err
}

return nil
}

func parseEvent[T proto.Message](
Expand Down
6 changes: 2 additions & 4 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
s.SyncGlobalParams(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
// Start the websocket event subscription process
s.SubscribeToBbnEvents(ctx)
// Keep processing events in the main thread
s.StartBbnEventProcessor(ctx)
// Keep processing BBN blocks in the main thread
s.StartBbnBlockProcessor(ctx)
}

func (s *Service) quitContext() (context.Context, func()) {
Expand Down

0 comments on commit db9071b

Please sign in to comment.