Skip to content

Commit

Permalink
Implement telegram as opt in (#52)
Browse files Browse the repository at this point in the history
* Implement telegram as opt in

* return already exists if operator in db

* send update telegram config

* add enoji
  • Loading branch information
pablomendezroyo authored Dec 2, 2024
1 parent c3a7c97 commit f70b9b7
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 26 deletions.
24 changes: 10 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,9 @@ func waitForInitialConfig(storageAdapter *storage.Storage) {
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Check for Telegram config
telegramConfig, err := storageAdapter.GetTelegramConfig()
if err != nil || telegramConfig.Token == "" || telegramConfig.UserID == 0 {
logger.Info("Waiting for Telegram config to be set...")
} else {
// Both operator IDs and Telegram config are set
logger.Info("Operator IDs and Telegram config are set. Proceeding with initialization.")
return
}
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
Expand All @@ -68,9 +62,14 @@ func main() {

// Initialize adapters
storageAdapter := storage.NewStorageAdapter()
// Initialize the notifier adapter (Telegram configuration optional)
notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter)
if err != nil {
logger.Warn("Telegram notifier not initialized: %v", err)
}

// Start HTTP server
apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS)
apiAdapter := api.NewAPIAdapter(storageAdapter, notifierAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
Expand Down Expand Up @@ -106,10 +105,7 @@ func main() {
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl)
exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl)
notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize Telegram notifier: %v", err)
}

csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err)
Expand Down
27 changes: 26 additions & 1 deletion internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import (
// APIHandler holds the necessary dependencies for API endpoints
type APIHandler struct {
StoragePort ports.StoragePort
NotifierPort ports.NotifierPort
Router *mux.Router
adapterPrefix string
}

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler {
func NewAPIAdapter(storagePort ports.StoragePort, notifierPort ports.NotifierPort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
StoragePort: storagePort,
NotifierPort: notifierPort,
Router: mux.NewRouter(),
adapterPrefix: "API",
}
Expand Down Expand Up @@ -115,6 +117,13 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request
return
}

// test the telegram connection
if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err)
writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

Expand Down Expand Up @@ -191,6 +200,22 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {
return
}

// check if operator id already exists and if so return ok
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
writeErrorResponse(w, "Failed to fetch operator IDs", http.StatusInternalServerError)
return
}

for _, id := range operatorIds {
if id.String() == req.OperatorID {
logger.DebugWithPrefix("API", "Operator ID %s already exists", req.OperatorID)
w.WriteHeader(http.StatusOK)
return
}
}

if err := h.StoragePort.SaveOperatorId(req.OperatorID); err != nil {
logger.ErrorWithPrefix("API", "Failed to update Operator ID: %v", err)
writeErrorResponse(w, "Failed to update Operator ID", http.StatusInternalServerError)
Expand Down
49 changes: 38 additions & 11 deletions internal/adapters/notifier/notifier_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,41 @@ import (
)

type TelegramBot struct {
Bot *tgbotapi.BotAPI
UserID int64
mu sync.RWMutex
Bot *tgbotapi.BotAPI
UserID int64
servicePrefix string
mu sync.RWMutex
}

func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) {
const servicePrefix = "TelegramNotifier"

// Load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
return nil, err
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

if initialConfig.Token == "" || initialConfig.UserID == 0 {
logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.")
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

// Initialize the bot with the initial token
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
if err != nil {
return nil, err
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

adapter := &TelegramBot{
Bot: bot,
UserID: initialConfig.UserID,
Bot: bot,
UserID: initialConfig.UserID,
servicePrefix: servicePrefix,
}

// Listen for configuration updates in a separate goroutine
Expand All @@ -41,11 +55,17 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (
adapter.mu.Lock()
adapter.UserID = newConfig.UserID
// Update the bot API instance with the new token
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
if newConfig.Token != "" {
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.")
} else {
logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err)
}
} else {
logger.ErrorWithPrefix("Notifier", "Failed to update Telegram bot: %v", err)
logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.")
adapter.Bot = nil // Disable notifications if the new config is invalid
}
adapter.mu.Unlock()
}
Expand All @@ -59,10 +79,17 @@ func (tb *TelegramBot) SendNotification(message string) error {
tb.mu.RLock()
defer tb.mu.RUnlock()

// Check if the bot is initialized
if tb.Bot == nil {
logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.")
return nil
}

msg := tgbotapi.NewMessage(tb.UserID, message)
_, err := tb.Bot.Send(msg)
if err != nil {
return fmt.Errorf("failed to send Telegram message: %w", err)
}
logger.DebugWithPrefix(tb.servicePrefix, "Notification sent successfully.")
return nil
}

0 comments on commit f70b9b7

Please sign in to comment.