Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont use listeners in telegram config update #83

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,17 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request
return
}

// Update storage
if err := h.StoragePort.SaveTelegramConfig(domain.TelegramConfig(req)); err != nil {
logger.ErrorWithPrefix("API", "Failed to update Telegram configuration: %v", err)
writeErrorResponse(w, "Failed to update Telegram configuration", http.StatusInternalServerError)
return
}

// send update notification to verify the chat id exists
if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err)
writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError)
// Synchronously update the Telegram bot configuration
if err := h.NotifierPort.UpdateBotConfig(); err != nil {
logger.ErrorWithPrefix("API", "Failed to update Telegram bot configuration: %v", err)
writeErrorResponse(w, "Failed to update Telegram bot configuration", http.StatusInternalServerError)
return
}

Expand Down
73 changes: 40 additions & 33 deletions internal/adapters/notifier/notifier_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,69 +13,76 @@ type TelegramBot struct {
Bot *tgbotapi.BotAPI
UserID int64
servicePrefix string
storagePort ports.StoragePort
}

func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) {
const servicePrefix = "TelegramNotifier"

adapter := &TelegramBot{
servicePrefix: servicePrefix,
storagePort: storageAdapter,
}

// Attempt to load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
// Initialize if config exists.
config, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
} else if initialConfig.Token != "" && initialConfig.UserID != 0 {
// Initialize the bot if the configuration is valid
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
} else if config.Token != "" && config.UserID != 0 {
bot, err := tgbotapi.NewBotAPI(config.Token)
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
} else {
adapter.Bot = bot
adapter.UserID = initialConfig.UserID
adapter.UserID = config.UserID
logger.InfoWithPrefix(servicePrefix, "Telegram bot initialized successfully.")
}
} else {
logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications will be disabled.")
logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications disabled.")
}

// Listen for configuration updates in a separate goroutine
telegramConfigUpdates := storageAdapter.RegisterTelegramConfigListener()
go func() {
for newConfig := range telegramConfigUpdates {
adapter.UserID = newConfig.UserID
// Update the bot API instance with the new token
if newConfig.Token != "" {
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.")
} else {
logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err)
adapter.Bot = nil // Disable notifications on failure
}
} else {
logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.")
adapter.Bot = nil // Disable notifications if the new config is invalid
}
}
}()

return adapter, nil
}

// SendNotification sends a message via Telegram
func (tb *TelegramBot) UpdateBotConfig() error {
config, err := tb.storagePort.GetTelegramConfig()
if err != nil {
return fmt.Errorf("failed to get Telegram config: %v", err)
}

if config.Token == "" || config.UserID == 0 {
tb.Bot = nil
tb.UserID = 0
logger.WarnWithPrefix(tb.servicePrefix, "Incomplete Telegram configuration. Notifications disabled.")
return nil
}

bot, err := tgbotapi.NewBotAPI(config.Token)
if err != nil {
tb.Bot = nil
return fmt.Errorf("failed to update Telegram bot: %v", err)
}

tb.Bot = bot
tb.UserID = config.UserID

// Send a test notification after the bot has been updated
if err := tb.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
return fmt.Errorf("failed to send test notification: %w", err)
}

logger.InfoWithPrefix(tb.servicePrefix, "Telegram bot configuration updated successfully.")

return nil
}

func (tb *TelegramBot) SendNotification(message string) error {
// Check if the bot is initialized
if tb.Bot == nil {
logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.")
return nil
}

// print user id
logger.DebugWithPrefix(tb.servicePrefix, "Sending notification to user ID: %d", tb.UserID)

msg := tgbotapi.NewMessage(tb.UserID, message)
_, err := tb.Bot.Send(msg)
if err != nil {
Expand Down
40 changes: 2 additions & 38 deletions internal/adapters/storage/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ package storage

import (
"lido-events/internal/application/domain"
"sync"
)

// TODO: determine if token should be stored hashed
// Telegram Config Methods

// Telegram Configuration Methods

// SaveTelegramConfig saves the Telegram configuration to storage and notifies listeners of the update.
func (fs *Storage) SaveTelegramConfig(config domain.TelegramConfig) error {
db, err := fs.LoadDatabase()
if err != nil {
Expand All @@ -21,46 +17,14 @@ func (fs *Storage) SaveTelegramConfig(config domain.TelegramConfig) error {
return err
}

fs.notifyTelegramConfigListenersSync() // Notify listeners of the change
// No notification to listeners needed anymore since we are doing synchronous updates.
return nil
}

// GetTelegramConfig retrieves the Telegram configuration from storage.
func (fs *Storage) GetTelegramConfig() (domain.TelegramConfig, error) {
db, err := fs.LoadDatabase()
if err != nil {
return domain.TelegramConfig{}, err
}
return db.Telegram, nil
}

// RegisterTelegramConfigListener registers a channel to receive updates when the Telegram config changes.
func (fs *Storage) RegisterTelegramConfigListener() chan domain.TelegramConfig {
updateChan := make(chan domain.TelegramConfig, 1)
fs.telegramConfigListeners = append(fs.telegramConfigListeners, updateChan)
return updateChan
}

// notifyTelegramConfigListenersSync sends updates to all registered listeners of Telegram config changes.
func (fs *Storage) notifyTelegramConfigListenersSync() {
config, err := fs.GetTelegramConfig()
if err != nil {
return
}

var wg sync.WaitGroup
for _, listener := range fs.telegramConfigListeners {
wg.Add(1)
go func(listener chan domain.TelegramConfig) {
defer wg.Done()
select {
case listener <- config:
// Config sent successfully
default:
// Ignore if channel is full to prevent blocking
}
}(listener)
}

wg.Wait() // Wait for all listeners to process the update
}
1 change: 1 addition & 0 deletions internal/application/ports/notifier_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package ports

type NotifierPort interface {
SendNotification(message string) error
UpdateBotConfig() error
}
1 change: 0 additions & 1 deletion internal/application/ports/storage_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ type StoragePort interface {
// telegram
GetTelegramConfig() (domain.TelegramConfig, error)
SaveTelegramConfig(config domain.TelegramConfig) error
RegisterTelegramConfigListener() chan domain.TelegramConfig
}
Loading