Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send update notification from adapter #57

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ import (
)

// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(storageAdapter *storage.Storage) {
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}

Expand Down Expand Up @@ -69,7 +74,7 @@ func main() {
}

// Start HTTP server
apiAdapter := api.NewAPIAdapter(storageAdapter, notifierAdapter, networkConfig.CORS)
apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
Expand Down Expand Up @@ -98,8 +103,31 @@ func main() {
}
}()

// Wait for initial configuration to be set
waitForInitialConfig(storageAdapter)
// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
}()

// Start listening for signals in a separate goroutine
go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
}()

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
return
}
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
return
}

ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
Expand Down
11 changes: 1 addition & 10 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ import (
// APIHandler holds the necessary dependencies for API endpoints
type APIHandler struct {
StoragePort ports.StoragePort
NotifierPort ports.NotifierPort
Router *mux.Router
adapterPrefix string
}

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(storagePort ports.StoragePort, notifierPort ports.NotifierPort, allowedOrigins []string) *APIHandler {
func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
StoragePort: storagePort,
NotifierPort: notifierPort,
Router: mux.NewRouter(),
adapterPrefix: "API",
}
Expand Down Expand Up @@ -117,13 +115,6 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request
return
}

// test the telegram connection
if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err)
writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ func (cs *CsFeeDistributorImplAdapter) ScanDistributionLogUpdatedEvents(ctx cont

for distributionLogUpdated.Next() {
if err := distributionLogUpdated.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading DistributionLogUpdated event: %w", err)
}

if err := handleDistributionLogUpdated(distributionLogUpdated.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle DistributionLogUpdated event: %w", err)
}
}

Expand Down
57 changes: 27 additions & 30 deletions internal/adapters/notifier/notifier_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"lido-events/internal/application/ports"
"lido-events/internal/logger"
"sync"

tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
)
Expand All @@ -14,60 +13,61 @@ type TelegramBot struct {
Bot *tgbotapi.BotAPI
UserID int64
servicePrefix string
mu sync.RWMutex
}

func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) {
const servicePrefix = "TelegramNotifier"

// Load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

if initialConfig.Token == "" || initialConfig.UserID == 0 {
logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.")
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
adapter := &TelegramBot{
servicePrefix: servicePrefix,
}

// Initialize the bot with the initial token
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
// Attempt to load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

adapter := &TelegramBot{
Bot: bot,
UserID: initialConfig.UserID,
servicePrefix: servicePrefix,
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
} else if initialConfig.Token != "" && initialConfig.UserID != 0 {
// Initialize the bot if the configuration is valid
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
} else {
adapter.Bot = bot
adapter.UserID = initialConfig.UserID
logger.InfoWithPrefix(servicePrefix, "Telegram bot initialized successfully.")
}
} else {
logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications will be disabled.")
}

// Listen for configuration updates in a separate goroutine
telegramConfigUpdates := storageAdapter.RegisterTelegramConfigListener()
go func() {
for newConfig := range telegramConfigUpdates {
adapter.mu.Lock()
adapter.UserID = newConfig.UserID
// Update the bot API instance with the new token
if newConfig.Token != "" {
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.")

// Send a test notification to confirm the new configuration works
testMessage := "🔑 Updated telegram configuration successfully"
testErr := adapter.SendNotification(testMessage)
if testErr != nil {
logger.ErrorWithPrefix(servicePrefix, "Failed to send test notification after configuration update: %v", testErr)
} else {
logger.InfoWithPrefix(servicePrefix, "Test notification sent successfully.")
}
} else {
logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err)
adapter.Bot = nil // Disable notifications on failure
}
} else {
logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.")
adapter.Bot = nil // Disable notifications if the new config is invalid
}
adapter.mu.Unlock()
}
}()

Expand All @@ -76,9 +76,6 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (

// SendNotification sends a message via Telegram
func (tb *TelegramBot) SendNotification(message string) error {
tb.mu.RLock()
defer tb.mu.RUnlock()

// Check if the bot is initialized
if tb.Bot == nil {
logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.")
Expand Down
6 changes: 2 additions & 4 deletions internal/adapters/vebo/vebo_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ func (va *VeboAdapter) ScanVeboValidatorExitRequestEvent(ctx context.Context, st

for validatorExitRequestEvents.Next() {
if err := validatorExitRequestEvents.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading ValidatorExitRequest event: %w", err)
}

if err := handleValidatorExitRequestEvent(validatorExitRequestEvents.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle ValidatorExitRequest event: %w", err)
}
}

Expand Down
Loading