diff --git a/cmd/main.go b/cmd/main.go index 7321fee..450289a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -35,15 +35,9 @@ func waitForInitialConfig(storageAdapter *storage.Storage) { if err != nil || len(operatorIds) == 0 { logger.Info("Waiting for operator IDs to be set...") } else { - // Check for Telegram config - telegramConfig, err := storageAdapter.GetTelegramConfig() - if err != nil || telegramConfig.Token == "" || telegramConfig.UserID == 0 { - logger.Info("Waiting for Telegram config to be set...") - } else { - // Both operator IDs and Telegram config are set - logger.Info("Operator IDs and Telegram config are set. Proceeding with initialization.") - return - } + // Operator IDs are set + logger.Info("Operator IDs are set. Proceeding with initialization.") + return } time.Sleep(2 * time.Second) // Poll every 2 seconds } @@ -68,9 +62,14 @@ func main() { // Initialize adapters storageAdapter := storage.NewStorageAdapter() + // Initialize the notifier adapter (Telegram configuration optional) + notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter) + if err != nil { + logger.Warn("Telegram notifier not initialized: %v", err) + } // Start HTTP server - apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS) + apiAdapter := api.NewAPIAdapter(storageAdapter, notifierAdapter, networkConfig.CORS) server := &http.Server{ Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10), Handler: apiAdapter.Router, @@ -106,10 +105,7 @@ func main() { beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL) executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl) exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl) - notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter) - if err != nil { - logger.Fatal("Failed to initialize Telegram notifier: %v", err) - } + csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) if err != nil { logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err) diff --git a/internal/adapters/api/api_adapter.go b/internal/adapters/api/api_adapter.go index aea8eac..83075bd 100644 --- a/internal/adapters/api/api_adapter.go +++ b/internal/adapters/api/api_adapter.go @@ -16,14 +16,16 @@ import ( // APIHandler holds the necessary dependencies for API endpoints type APIHandler struct { StoragePort ports.StoragePort + NotifierPort ports.NotifierPort Router *mux.Router adapterPrefix string } // NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled -func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler { +func NewAPIAdapter(storagePort ports.StoragePort, notifierPort ports.NotifierPort, allowedOrigins []string) *APIHandler { h := &APIHandler{ StoragePort: storagePort, + NotifierPort: notifierPort, Router: mux.NewRouter(), adapterPrefix: "API", } @@ -115,6 +117,13 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request return } + // test the telegram connection + if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil { + logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err) + writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) } @@ -191,6 +200,22 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) { return } + // check if operator id already exists and if so return ok + operatorIds, err := h.StoragePort.GetOperatorIds() + if err != nil { + logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err) + writeErrorResponse(w, "Failed to fetch operator IDs", http.StatusInternalServerError) + return + } + + for _, id := range operatorIds { + if id.String() == req.OperatorID { + logger.DebugWithPrefix("API", "Operator ID %s already exists", req.OperatorID) + w.WriteHeader(http.StatusOK) + return + } + } + if err := h.StoragePort.SaveOperatorId(req.OperatorID); err != nil { logger.ErrorWithPrefix("API", "Failed to update Operator ID: %v", err) writeErrorResponse(w, "Failed to update Operator ID", http.StatusInternalServerError) diff --git a/internal/adapters/notifier/notifier_adapter.go b/internal/adapters/notifier/notifier_adapter.go index f7f1a9a..526defc 100644 --- a/internal/adapters/notifier/notifier_adapter.go +++ b/internal/adapters/notifier/notifier_adapter.go @@ -11,27 +11,41 @@ import ( ) type TelegramBot struct { - Bot *tgbotapi.BotAPI - UserID int64 - mu sync.RWMutex + Bot *tgbotapi.BotAPI + UserID int64 + servicePrefix string + mu sync.RWMutex } func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) { + const servicePrefix = "TelegramNotifier" + // Load the initial configuration for Telegram initialConfig, err := storageAdapter.GetTelegramConfig() if err != nil { - return nil, err + logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err) + // Return a TelegramBot instance without initializing the bot + return &TelegramBot{servicePrefix: servicePrefix}, nil + } + + if initialConfig.Token == "" || initialConfig.UserID == 0 { + logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.") + // Return a TelegramBot instance without initializing the bot + return &TelegramBot{servicePrefix: servicePrefix}, nil } // Initialize the bot with the initial token bot, err := tgbotapi.NewBotAPI(initialConfig.Token) if err != nil { - return nil, err + logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err) + // Return a TelegramBot instance without initializing the bot + return &TelegramBot{servicePrefix: servicePrefix}, nil } adapter := &TelegramBot{ - Bot: bot, - UserID: initialConfig.UserID, + Bot: bot, + UserID: initialConfig.UserID, + servicePrefix: servicePrefix, } // Listen for configuration updates in a separate goroutine @@ -41,11 +55,17 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) ( adapter.mu.Lock() adapter.UserID = newConfig.UserID // Update the bot API instance with the new token - updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token) - if err == nil { - adapter.Bot = updatedBot + if newConfig.Token != "" { + updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token) + if err == nil { + adapter.Bot = updatedBot + logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.") + } else { + logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err) + } } else { - logger.ErrorWithPrefix("Notifier", "Failed to update Telegram bot: %v", err) + logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.") + adapter.Bot = nil // Disable notifications if the new config is invalid } adapter.mu.Unlock() } @@ -59,10 +79,17 @@ func (tb *TelegramBot) SendNotification(message string) error { tb.mu.RLock() defer tb.mu.RUnlock() + // Check if the bot is initialized + if tb.Bot == nil { + logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.") + return nil + } + msg := tgbotapi.NewMessage(tb.UserID, message) _, err := tb.Bot.Send(msg) if err != nil { return fmt.Errorf("failed to send Telegram message: %w", err) } + logger.DebugWithPrefix(tb.servicePrefix, "Notification sent successfully.") return nil }