Skip to content

Commit

Permalink
Merge branch 'main' into pablo/check-mev-relays
Browse files Browse the repository at this point in the history
  • Loading branch information
pablomendezroyo authored Dec 5, 2024
2 parents 2d8bd1c + 23a5b2c commit 2d99e7c
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 63 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/release-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
# Checkout code
- name: Checkout code
uses: actions/checkout@v4

# Run integration tests
- name: Run Integration Tests
run: |
Expand All @@ -32,7 +32,7 @@ jobs:
release:
name: Release and Publish Docker Image
needs: test # Ensure the test job runs successfully first
needs: test
runs-on: ubuntu-latest

steps:
Expand All @@ -52,7 +52,7 @@ jobs:
INPUT_VERSION="${{ github.event.inputs.version }}"
# Find the latest tag
LATEST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "0.0.0")
LATEST_TAG=$(git tag --sort=-v:refname | head -n 1)
# If an input version is provided, use it
if [[ -n "$INPUT_VERSION" ]]; then
Expand All @@ -71,6 +71,7 @@ jobs:
echo "New version: $NEW_VERSION"
echo "::set-output name=version::$NEW_VERSION"
# Create a GitHub release
- name: Create GitHub Release
uses: softprops/action-gh-release@v2
Expand All @@ -84,6 +85,12 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

# Push the new tag to the repository
- name: Push new tag
run: |
git tag ${{ steps.determine_version.outputs.version }}
git push origin ${{ steps.determine_version.outputs.version }}
# Log in to GitHub Docker Registry
- name: Log in to GitHub Docker Registry
uses: docker/login-action@v3
Expand Down
52 changes: 40 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,23 @@ import (
)

// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(storageAdapter *storage.Storage) {
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}

Expand Down Expand Up @@ -105,8 +110,31 @@ func main() {
}
}()

// Wait for initial configuration to be set
waitForInitialConfig(storageAdapter)
// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
}()

// Start listening for signals in a separate goroutine
go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
}()

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
return
}
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
return
}

ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
Expand Down
7 changes: 0 additions & 7 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ func (h *APIHandler) UpdateTelegramConfig(w http.ResponseWriter, r *http.Request
return
}

// test the telegram connection
if err := h.NotifierPort.SendNotification("🔑 Updated telegram configuration successfully"); err != nil {
logger.ErrorWithPrefix("API", "Failed to send test notification: %v", err)
writeErrorResponse(w, "Failed to send test notification", http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ func (cs *CsFeeDistributorImplAdapter) ScanDistributionLogUpdatedEvents(ctx cont

for distributionLogUpdated.Next() {
if err := distributionLogUpdated.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading DistributionLogUpdated event: %w", err)
}

if err := handleDistributionLogUpdated(distributionLogUpdated.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle DistributionLogUpdated event: %w", err)
}
}

Expand Down
57 changes: 27 additions & 30 deletions internal/adapters/notifier/notifier_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"lido-events/internal/application/ports"
"lido-events/internal/logger"
"sync"

tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
)
Expand All @@ -14,60 +13,61 @@ type TelegramBot struct {
Bot *tgbotapi.BotAPI
UserID int64
servicePrefix string
mu sync.RWMutex
}

func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (*TelegramBot, error) {
const servicePrefix = "TelegramNotifier"

// Load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

if initialConfig.Token == "" || initialConfig.UserID == 0 {
logger.WarnWithPrefix(servicePrefix, "Telegram configuration is incomplete. Notifications will be disabled.")
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
adapter := &TelegramBot{
servicePrefix: servicePrefix,
}

// Initialize the bot with the initial token
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
// Attempt to load the initial configuration for Telegram
initialConfig, err := storageAdapter.GetTelegramConfig()
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
// Return a TelegramBot instance without initializing the bot
return &TelegramBot{servicePrefix: servicePrefix}, nil
}

adapter := &TelegramBot{
Bot: bot,
UserID: initialConfig.UserID,
servicePrefix: servicePrefix,
logger.WarnWithPrefix(servicePrefix, "Failed to load initial Telegram configuration: %v", err)
} else if initialConfig.Token != "" && initialConfig.UserID != 0 {
// Initialize the bot if the configuration is valid
bot, err := tgbotapi.NewBotAPI(initialConfig.Token)
if err != nil {
logger.WarnWithPrefix(servicePrefix, "Failed to initialize Telegram bot: %v", err)
} else {
adapter.Bot = bot
adapter.UserID = initialConfig.UserID
logger.InfoWithPrefix(servicePrefix, "Telegram bot initialized successfully.")
}
} else {
logger.WarnWithPrefix(servicePrefix, "Initial Telegram configuration is incomplete. Notifications will be disabled.")
}

// Listen for configuration updates in a separate goroutine
telegramConfigUpdates := storageAdapter.RegisterTelegramConfigListener()
go func() {
for newConfig := range telegramConfigUpdates {
adapter.mu.Lock()
adapter.UserID = newConfig.UserID
// Update the bot API instance with the new token
if newConfig.Token != "" {
updatedBot, err := tgbotapi.NewBotAPI(newConfig.Token)
if err == nil {
adapter.Bot = updatedBot
logger.InfoWithPrefix(servicePrefix, "Telegram bot configuration updated successfully.")

// Send a test notification to confirm the new configuration works
testMessage := "🔑 Updated telegram configuration successfully"
testErr := adapter.SendNotification(testMessage)
if testErr != nil {
logger.ErrorWithPrefix(servicePrefix, "Failed to send test notification after configuration update: %v", testErr)
} else {
logger.InfoWithPrefix(servicePrefix, "Test notification sent successfully.")
}
} else {
logger.ErrorWithPrefix(servicePrefix, "Failed to update Telegram bot: %v", err)
adapter.Bot = nil // Disable notifications on failure
}
} else {
logger.WarnWithPrefix(servicePrefix, "Received incomplete Telegram configuration. Notifications will be disabled.")
adapter.Bot = nil // Disable notifications if the new config is invalid
}
adapter.mu.Unlock()
}
}()

Expand All @@ -76,9 +76,6 @@ func NewNotifierAdapter(ctx context.Context, storageAdapter ports.StoragePort) (

// SendNotification sends a message via Telegram
func (tb *TelegramBot) SendNotification(message string) error {
tb.mu.RLock()
defer tb.mu.RUnlock()

// Check if the bot is initialized
if tb.Bot == nil {
logger.WarnWithPrefix(tb.servicePrefix, "Telegram bot is not initialized. Skipping notification.")
Expand Down
7 changes: 5 additions & 2 deletions internal/adapters/proxyApi/proxy_api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func (h *APIHandler) proxyHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Copy headers from the original request
// Copy headers from the original request, excluding the Origin header
for key, values := range r.Header {
if key == "Origin" {
continue // Skip the Origin header
}
for _, value := range values {
proxyReq.Header.Add(key, value)
}
Expand All @@ -84,7 +87,7 @@ func (h *APIHandler) proxyHandler(w http.ResponseWriter, r *http.Request) {

// Add CORS headers to the response
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin")) // Reflect the origin for allowed requests
w.Header().Set("Access-Control-Allow-Credentials", "true") // Allow credentials if needed
w.Header().Set("Access-Control-Allow-Credentials", "true") // Allow credentials if needed
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")

Expand Down
6 changes: 2 additions & 4 deletions internal/adapters/vebo/vebo_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ func (va *VeboAdapter) ScanVeboValidatorExitRequestEvent(ctx context.Context, st

for validatorExitRequestEvents.Next() {
if err := validatorExitRequestEvents.Error(); err != nil {
// Skip this event if there is an error retrieving it
continue
return fmt.Errorf("error reading ValidatorExitRequest event: %w", err)
}

if err := handleValidatorExitRequestEvent(validatorExitRequestEvents.Event); err != nil {
// Continue to the next event if handling fails
continue
return fmt.Errorf("failed to handle ValidatorExitRequest event: %w", err)
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/application/services/validatorEjector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ func (ve *ValidatorEjector) EjectValidator() error {
continue
}

// TODO: simplify this logic
// if the validator status is not active ongoing or active slashed we skip the exit request because it is already exiting.
if onchainStatus != domain.StatusActiveOngoing && onchainStatus != domain.StatusActiveSlashed {
if onchainStatus != domain.StatusPendingInitialized && onchainStatus != domain.StatusPendingQueued {
logger.InfoWithPrefix(ve.servicePrefix, "Validator %s is %s so no exit request is required, deleting the exit request from db", exitRequest.Event.ValidatorIndex, exitRequest.Status)
// TODO: send notiifcation validator exited if timestamp of the event is within an hour
//Since the validator is already exiting, we remove the exit request from the db
if err := ve.storagePort.DeleteExitRequest(operatorID.String(), exitRequest.Event.ValidatorIndex.String()); err != nil {
// An error here is no big deal, we will retry to delete this in the next iteration of the cron
Expand All @@ -102,7 +104,7 @@ func (ve *ValidatorEjector) EjectValidator() error {
}

// send notification and skip on error
message := fmt.Sprintf("- 🚨 Your validator %s is requested to exit.", exitRequest.Event.ValidatorIndex)
message := fmt.Sprintf("- 🚨 Your validator %s is requested to exit. Executing automatic exit.", exitRequest.Event.ValidatorIndex)
if err := ve.notifierPort.SendNotification(message); err != nil {
logger.ErrorWithPrefix(ve.servicePrefix, "Error sending exit notification", err)
}
Expand All @@ -120,6 +122,8 @@ func (ve *ValidatorEjector) EjectValidator() error {
continue
}

// TODO: send notification "exited submitted. Your validator will exit within X minutes. wait for confirmatio, If not confirmation received, please check manually"

// wait for the transaction to be included
// call ve.beaconchainPort.GetValidatorStatus(string(validator.Event.ValidatorPubkey)) in a loop until the status is domain.StatusActiveExiting
// a maximum of 64 times with a 30 second sleep between each call (check for 32 minutes, two times x minute)
Expand Down

0 comments on commit 2d99e7c

Please sign in to comment.