diff --git a/cmd/main.go b/cmd/main.go index 450289a..32535a9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -28,18 +28,23 @@ import ( ) // Helper function to check if operator IDs and Telegram config are available -func waitForInitialConfig(storageAdapter *storage.Storage) { +func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error { for { - // Check for operator IDs - operatorIds, err := storageAdapter.GetOperatorIds() - if err != nil || len(operatorIds) == 0 { - logger.Info("Waiting for operator IDs to be set...") - } else { - // Operator IDs are set - logger.Info("Operator IDs are set. Proceeding with initialization.") - return + select { + case <-ctx.Done(): // Exit if the context is canceled + return ctx.Err() + default: + // Check for operator IDs + operatorIds, err := storageAdapter.GetOperatorIds() + if err != nil || len(operatorIds) == 0 { + logger.Info("Waiting for operator IDs to be set...") + } else { + // Operator IDs are set + logger.Info("Operator IDs are set. Proceeding with initialization.") + return nil + } + time.Sleep(2 * time.Second) // Poll every 2 seconds } - time.Sleep(2 * time.Second) // Poll every 2 seconds } } @@ -69,7 +74,7 @@ func main() { } // Start HTTP server - apiAdapter := api.NewAPIAdapter(storageAdapter, notifierAdapter, networkConfig.CORS) + apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS) server := &http.Server{ Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10), Handler: apiAdapter.Router, @@ -98,8 +103,31 @@ func main() { } }() - // Wait for initial configuration to be set - waitForInitialConfig(storageAdapter) + // Wait for initial configuration in a separate goroutine + configReady := make(chan error, 1) + go func() { + configReady <- waitForInitialConfig(ctx, storageAdapter) + }() + + // Start listening for signals in a separate goroutine + go func() { + <-signalChan + logger.Info("Received shutdown signal. Initiating graceful shutdown...") + cancel() // Cancel context to stop all services + }() + + // Wait for either the config to be ready or the context to be canceled + select { + case err := <-configReady: + if err != nil { + logger.Warn("Shutting down due to: %v", err) + return + } + logger.Info("Configuration is ready. Proceeding with initialization.") + case <-ctx.Done(): + logger.Info("Context canceled before configuration was ready.") + return + } ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl) beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL) diff --git a/internal/adapters/api/api_adapter.go b/internal/adapters/api/api_adapter.go index 83075bd..9d2dfd6 100644 --- a/internal/adapters/api/api_adapter.go +++ b/internal/adapters/api/api_adapter.go @@ -16,16 +16,14 @@ import ( // APIHandler holds the necessary dependencies for API endpoints type APIHandler struct { StoragePort ports.StoragePort - NotifierPort ports.NotifierPort Router *mux.Router adapterPrefix string } // NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled -func NewAPIAdapter(storagePort ports.StoragePort, notifierPort ports.NotifierPort, allowedOrigins []string) *APIHandler { +func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler { h := &APIHandler{ StoragePort: storagePort, - NotifierPort: notifierPort, Router: mux.NewRouter(), adapterPrefix: "API", } @@ -117,13 +115,6 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request return } - // test the telegram connection - if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil { - logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err) - writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusOK) } diff --git a/internal/adapters/csFeeDistributorImpl/csFeeDistributorImpl_adapter.go b/internal/adapters/csFeeDistributorImpl/csFeeDistributorImpl_adapter.go index 309aea5..21ce12c 100644 --- a/internal/adapters/csFeeDistributorImpl/csFeeDistributorImpl_adapter.go +++ b/internal/adapters/csFeeDistributorImpl/csFeeDistributorImpl_adapter.go @@ -46,13 +46,11 @@ func (cs *CsFeeDistributorImplAdapter) ScanDistributionLogUpdatedEvents(ctx cont for distributionLogUpdated.Next() { if err := distributionLogUpdated.Error(); err != nil { - // Skip this event if there is an error retrieving it - continue + return fmt.Errorf("error reading DistributionLogUpdated event: %w", err) } if err := handleDistributionLogUpdated(distributionLogUpdated.Event); err != nil { - // Continue to the next event if handling fails - continue + return fmt.Errorf("failed to handle DistributionLogUpdated event: %w", err) } } diff --git a/internal/adapters/notifier/notifier_adapter.go b/internal/adapters/notifier/notifier_adapter.go index 526defc..ccd1a1c 100644 --- a/internal/adapters/notifier/notifier_adapter.go +++ b/internal/adapters/notifier/notifier_adapter.go @@ -5,7 +5,6 @@ import ( "fmt" "lido-events/internal/application/ports" "lido-events/internal/logger" - "sync" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" ) @@ -14,45 +13,37 @@ type TelegramBot struct { Bot *tgbotapi.BotAPI UserID int64 servicePrefix string - mu sync.RWMutex } func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) { const servicePrefix = "TelegramNotifier" - // Load the initial configuration for Telegram - initialConfig, err := storageAdapter.GetTelegramConfig() - if err != nil { - logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err) - // Return a TelegramBot instance without initializing the bot - return &TelegramBot{servicePrefix: servicePrefix}, nil - } - - if initialConfig.Token == "" || initialConfig.UserID == 0 { - logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.") - // Return a TelegramBot instance without initializing the bot - return &TelegramBot{servicePrefix: servicePrefix}, nil + adapter := &TelegramBot{ + servicePrefix: servicePrefix, } - // Initialize the bot with the initial token - bot, err := tgbotapi.NewBotAPI(initialConfig.Token) + // Attempt to load the initial configuration for Telegram + initialConfig, err := storageAdapter.GetTelegramConfig() if err != nil { - logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err) - // Return a TelegramBot instance without initializing the bot - return &TelegramBot{servicePrefix: servicePrefix}, nil - } - - adapter := &TelegramBot{ - Bot: bot, - UserID: initialConfig.UserID, - servicePrefix: servicePrefix, + logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err) + } else if initialConfig.Token != "" && initialConfig.UserID != 0 { + // Initialize the bot if the configuration is valid + bot, err := tgbotapi.NewBotAPI(initialConfig.Token) + if err != nil { + logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err) + } else { + adapter.Bot = bot + adapter.UserID = initialConfig.UserID + logger.InfoWithPrefix(servicePrefix, "Telegram bot initialized successfully.") + } + } else { + logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications will be disabled.") } // Listen for configuration updates in a separate goroutine telegramConfigUpdates := storageAdapter.RegisterTelegramConfigListener() go func() { for newConfig := range telegramConfigUpdates { - adapter.mu.Lock() adapter.UserID = newConfig.UserID // Update the bot API instance with the new token if newConfig.Token != "" { @@ -60,14 +51,23 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) ( if err == nil { adapter.Bot = updatedBot logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.") + + // Send a test notification to confirm the new configuration works + testMessage := "🔑 Updated telegram configuration successfully" + testErr := adapter.SendNotification(testMessage) + if testErr != nil { + logger.ErrorWithPrefix(servicePrefix, "Failed to send test notification after configuration update: %v", testErr) + } else { + logger.InfoWithPrefix(servicePrefix, "Test notification sent successfully.") + } } else { logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err) + adapter.Bot = nil // Disable notifications on failure } } else { logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.") adapter.Bot = nil // Disable notifications if the new config is invalid } - adapter.mu.Unlock() } }() @@ -76,9 +76,6 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) ( // SendNotification sends a message via Telegram func (tb *TelegramBot) SendNotification(message string) error { - tb.mu.RLock() - defer tb.mu.RUnlock() - // Check if the bot is initialized if tb.Bot == nil { logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.") diff --git a/internal/adapters/vebo/vebo_adapter.go b/internal/adapters/vebo/vebo_adapter.go index f19041a..c6e07ab 100644 --- a/internal/adapters/vebo/vebo_adapter.go +++ b/internal/adapters/vebo/vebo_adapter.go @@ -57,13 +57,11 @@ func (va *VeboAdapter) ScanVeboValidatorExitRequestEvent(ctx context.Context, st for validatorExitRequestEvents.Next() { if err := validatorExitRequestEvents.Error(); err != nil { - // Skip this event if there is an error retrieving it - continue + return fmt.Errorf("error reading ValidatorExitRequest event: %w", err) } if err := handleValidatorExitRequestEvent(validatorExitRequestEvents.Event); err != nil { - // Continue to the next event if handling fails - continue + return fmt.Errorf("failed to handle ValidatorExitRequest event: %w", err) } }