diff --git a/internal/db/error.go b/internal/db/error.go index 0fee9a0..23f24f6 100644 --- a/internal/db/error.go +++ b/internal/db/error.go @@ -1,6 +1,9 @@ package db -import "errors" +import ( + "errors" + "fmt" +) // DuplicateKeyError is an error type for duplicate key errors type DuplicateKeyError struct { @@ -41,7 +44,7 @@ type NotFoundError struct { } func (e *NotFoundError) Error() string { - return e.Message + return fmt.Sprintf("%s: %s", e.Message, e.Key) } func IsNotFoundError(err error) bool { diff --git a/internal/services/bootstrap.go b/internal/services/bootstrap.go index bcea1d2..bd5b926 100644 --- a/internal/services/bootstrap.go +++ b/internal/services/bootstrap.go @@ -19,11 +19,9 @@ const ( // If an error occurs, it logs the error and terminates the program. // The method runs asynchronously to allow non-blocking operation. func (s *Service) StartBbnBlockProcessor(ctx context.Context) { - go func() { - if err := s.processBlocksSequentially(ctx); err != nil { - log.Fatal().Msgf("BBN block processor exited with error: %v", err) - } - }() + if err := s.processBlocksSequentially(ctx); err != nil { + log.Fatal().Msgf("BBN block processor exited with error: %v", err) + } } // processBlocksSequentially processes BBN blockchain blocks in sequential order, @@ -63,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { } // Process blocks from lastProcessedHeight + 1 to latestHeight - for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ { + for i := lastProcessedHeight; i <= uint64(latestHeight); i++ { select { case <-ctx.Done(): return types.NewError( @@ -76,11 +74,13 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { if err != nil { return err } + for _, event := range events { - s.bbnEventProcessor <- event + if err := s.processEvent(ctx, event); err != nil { + return err + } } - // Update lastProcessedHeight after successful processing if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil { return types.NewError( http.StatusInternalServerError, diff --git a/internal/services/events.go b/internal/services/events.go index 80443ab..dd5a4e2 100644 --- a/internal/services/events.go +++ b/internal/services/events.go @@ -35,23 +35,8 @@ func NewBbnEvent(category EventCategory, event abcitypes.Event) BbnEvent { } } -// startBbnEventProcessor continuously listens for events from the channel and -// processes them in the main thread -func (s *Service) StartBbnEventProcessor(ctx context.Context) { - for event := range s.bbnEventProcessor { - if event.Event.Type == "" { - log.Warn().Msg("Empty event received, skipping") - continue - } - // Create a new context with a timeout for each event - ctx, cancel := context.WithTimeout(context.Background(), eventProcessingTimeout) - defer cancel() - s.processEvent(ctx, event) - } -} - // Entry point for processing events -func (s *Service) processEvent(ctx context.Context, event BbnEvent) { +func (s *Service) processEvent(ctx context.Context, event BbnEvent) *types.Error { // Note: We no longer need to check for the event category here. We can directly // process the event based on its type. bbnEvent := event.Event @@ -89,8 +74,10 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) { if err != nil { log.Error().Err(err).Msg("Failed to process event") - panic(err) + return err } + + return nil } func parseEvent[T proto.Message]( diff --git a/internal/services/service.go b/internal/services/service.go index bbe8120..75b56a0 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -64,12 +64,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) { s.SyncGlobalParams(ctx) // Start the expiry checker s.StartExpiryChecker(ctx) - // Start the BBN block processor - s.StartBbnBlockProcessor(ctx) // Start the websocket event subscription process s.SubscribeToBbnEvents(ctx) - // Keep processing events in the main thread - s.StartBbnEventProcessor(ctx) + // Keep processing BBN blocks in the main thread + s.StartBbnBlockProcessor(ctx) } func (s *Service) quitContext() (context.Context, func()) {