Skip to content

Commit

Permalink
Send update notification from adapter (#57)
Browse files Browse the repository at this point in the history
* Send update notification from adapter

* remove previous return

* Add signal catch in the wait for initial config

* Add return err

* return err in second scanner
  • Loading branch information
pablomendezroyo authored Dec 4, 2024
1 parent d904511 commit 23a5b2c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 61 deletions.
54 changes: 41 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ import (
)

// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(storageAdapter *storage.Storage) {
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}

Expand Down Expand Up @@ -69,7 +74,7 @@ func main() {
}

// Start HTTP server
apiAdapter := api.NewAPIAdapter(storageAdapter, notifierAdapter, networkConfig.CORS)
apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
Expand Down Expand Up @@ -98,8 +103,31 @@ func main() {
}
}()

// Wait for initial configuration to be set
waitForInitialConfig(storageAdapter)
// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
}()

// Start listening for signals in a separate goroutine
go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
}()

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
return
}
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
return
}

ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
Expand Down
11 changes: 1 addition & 10 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ import (
// APIHandler holds the necessary dependencies for API endpoints
type APIHandler struct {
StoragePort ports.StoragePort
NotifierPort ports.NotifierPort
Router *mux.Router
adapterPrefix string
}

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(storagePort ports.StoragePort, notifierPort ports.NotifierPort, allowedOrigins []string) *APIHandler {
func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
StoragePort: storagePort,
NotifierPort: notifierPort,
Router: mux.NewRouter(),
adapterPrefix: "API",
}
Expand Down Expand Up @@ -117,13 +115,6 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request
return
}

// test the telegram connection
if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err)
writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ func (cs *CsFeeDistributorImplAdapter) ScanDistributionLogUpdatedEvents(ctx cont

for distributionLogUpdated.Next() {
if err := distributionLogUpdated.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading DistributionLogUpdated event: %w", err)
}

if err := handleDistributionLogUpdated(distributionLogUpdated.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle DistributionLogUpdated event: %w", err)
}
}

Expand Down
57 changes: 27 additions & 30 deletions internal/adapters/notifier/notifier_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"lido-events/internal/application/ports"
"lido-events/internal/logger"
"sync"

tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
)
Expand All @@ -14,60 +13,61 @@ type TelegramBot struct {
Bot *tgbotapi.BotAPI
UserID int64
servicePrefix string
mu sync.RWMutex
}

func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) {
const servicePrefix = "TelegramNotifier"

// Load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

if initialConfig.Token == "" || initialConfig.UserID == 0 {
logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.")
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
adapter := &TelegramBot{
servicePrefix: servicePrefix,
}

// Initialize the bot with the initial token
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
// Attempt to load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

adapter := &TelegramBot{
Bot: bot,
UserID: initialConfig.UserID,
servicePrefix: servicePrefix,
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
} else if initialConfig.Token != "" && initialConfig.UserID != 0 {
// Initialize the bot if the configuration is valid
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
} else {
adapter.Bot = bot
adapter.UserID = initialConfig.UserID
logger.InfoWithPrefix(servicePrefix, "Telegram bot initialized successfully.")
}
} else {
logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications will be disabled.")
}

// Listen for configuration updates in a separate goroutine
telegramConfigUpdates := storageAdapter.RegisterTelegramConfigListener()
go func() {
for newConfig := range telegramConfigUpdates {
adapter.mu.Lock()
adapter.UserID = newConfig.UserID
// Update the bot API instance with the new token
if newConfig.Token != "" {
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.")

// Send a test notification to confirm the new configuration works
testMessage := "🔑 Updated telegram configuration successfully"
testErr := adapter.SendNotification(testMessage)
if testErr != nil {
logger.ErrorWithPrefix(servicePrefix, "Failed to send test notification after configuration update: %v", testErr)
} else {
logger.InfoWithPrefix(servicePrefix, "Test notification sent successfully.")
}
} else {
logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err)
adapter.Bot = nil // Disable notifications on failure
}
} else {
logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.")
adapter.Bot = nil // Disable notifications if the new config is invalid
}
adapter.mu.Unlock()
}
}()

Expand All @@ -76,9 +76,6 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (

// SendNotification sends a message via Telegram
func (tb *TelegramBot) SendNotification(message string) error {
tb.mu.RLock()
defer tb.mu.RUnlock()

// Check if the bot is initialized
if tb.Bot == nil {
logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.")
Expand Down
6 changes: 2 additions & 4 deletions internal/adapters/vebo/vebo_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ func (va *VeboAdapter) ScanVeboValidatorExitRequestEvent(ctx context.Context, st

for validatorExitRequestEvents.Next() {
if err := validatorExitRequestEvents.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading ValidatorExitRequest event: %w", err)
}

if err := handleValidatorExitRequestEvent(validatorExitRequestEvents.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle ValidatorExitRequest event: %w", err)
}
}

Expand Down

0 comments on commit 23a5b2c

Please sign in to comment.