Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify main logic #62

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 73 additions & 118 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package main

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

"lido-events/internal/adapters/api"
"lido-events/internal/adapters/beaconchain"
csfeedistributor "lido-events/internal/adapters/csFeeDistributor"
Expand All @@ -14,185 +20,134 @@ import (
proxyapi "lido-events/internal/adapters/proxyApi"
"lido-events/internal/adapters/storage"
"lido-events/internal/adapters/vebo"
"lido-events/internal/logger"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"lido-events/internal/application/services"
"lido-events/internal/config"
"net/http"
"lido-events/internal/logger"
)

// Helper function to check if operator IDs and Telegram config are available
func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.Info("Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.Info("Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}
}
var logPrefix = "MAIN"

func main() {
// Set up context with cancellation and a WaitGroup for graceful shutdown
// Set up context with cancellation and WaitGroup for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup

// Set up signal channel to handle OS interrupts
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

// Load configurations
networkConfig, err := config.LoadNetworkConfig()
if err != nil {
logger.Fatal("Failed to load network configuration: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to load network configuration: %v", err)
}
logger.Debug("Network config: %+v", networkConfig)
logger.DebugWithPrefix(logPrefix, "Network config: %+v", networkConfig)

// Initialize adapters
storageAdapter := storage.NewStorageAdapter()
// Initialize the notifier adapter (Telegram configuration optional)
notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter)
if err != nil {
logger.Warn("Telegram notifier not initialized: %v", err)
logger.WarnWithPrefix(logPrefix, "Telegram notifier not initialized: %v", err)
}

// Start HTTP server
apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS)
server := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10),
Handler: apiAdapter.Router,
}
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Server started on :%d", networkConfig.ApiPort)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("HTTP server ListenAndServe: %v", err)
}
}()

// Start Proxy API server
proxyApiAdapter := proxyapi.NewProxyAPIAdapter(networkConfig.CORS, networkConfig.LidoKeysApiUrl)
proxyServer := &http.Server{
Addr: ":" + strconv.FormatUint(networkConfig.ProxyApiPort, 10),
Handler: proxyApiAdapter.Router,
}
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Proxy API server started on :%d", networkConfig.ProxyApiPort)
if err := proxyServer.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal("Proxy API server ListenAndServe: %v", err)
}
}()

// Wait for initial configuration in a separate goroutine
configReady := make(chan error, 1)
go func() {
configReady <- waitForInitialConfig(ctx, storageAdapter)
}()
// Initialize API services
apiService := services.NewAPIServerService(apiAdapter, networkConfig.ApiPort)
proxyService := services.NewProxyAPIServerService(proxyApiAdapter, networkConfig.ProxyApiPort)

// Start listening for signals in a separate goroutine
go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to stop all services
}()
// Start API services
apiService.Start(&wg)
proxyService.Start(&wg)

// Wait for either the config to be ready or the context to be canceled
select {
case err := <-configReady:
if err != nil {
logger.Warn("Shutting down due to: %v", err)
return
}
logger.Info("Configuration is ready. Proceeding with initialization.")
case <-ctx.Done():
logger.Info("Context canceled before configuration was ready.")
return
// Wait for and validate initial configuration
if err := waitForConfig(ctx, storageAdapter); err != nil {
logger.FatalWithPrefix(logPrefix, "Application shutting down due to configuration validation failure: %v", err)
}

// Initialize domain adapters
ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl)
beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL)
executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl)
exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl)

csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorImplAdapter: %v", err)
}
veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize Vebo adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize VeboAdapter: %v", err)
}
csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter)
if err != nil {
logger.Fatal("Failed to initialize CsModule adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsModuleAdapter: %v", err)
}
csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress)
if err != nil {
logger.Fatal("Failed to initialize CsFeeDistributor adapter: %v", err)
logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorAdapter: %v", err)
}

// Initialize services
// Initialize domain services
eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter)
distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment)
validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment)
validatorEjectorService := services.NewValidatorEjectorService(storageAdapter, notifierAdapter, exitValidatorAdapter, beaconchainAdapter)
pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, ipfsAdapter)

// DistributionLogUpdated
// Start domain services
distributionLogUpdatedExecutionComplete := make(chan struct{})
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) // once every epoch
go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete)
go pendingHashesLoaderService.LoadPendingHashesCron(ctx, 3*time.Hour, &wg, distributionLogUpdatedExecutionComplete)

// ExitRequest
exitRequestExecutionComplete := make(chan struct{})
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) // once every epoch
go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete)
go validatorEjectorService.ValidatorEjectorCron(ctx, 64*time.Minute, &wg, exitRequestExecutionComplete)

// Events watcher
go eventsWatcherService.WatchAllEvents(ctx, &wg)

// Handle shutdown signals
// Handle OS signals for shutdown
handleShutdown(cancel, apiService, proxyService)

// Wait for all goroutines to finish
wg.Wait()
logger.InfoWithPrefix(logPrefix, "All services stopped. Shutting down application.")
}

// Helper function to check if operator IDs and Telegram config are available
func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error {
for {
select {
case <-ctx.Done(): // Exit if the context is canceled
logger.InfoWithPrefix(logPrefix, "Context canceled before configuration was ready.")
return ctx.Err()
default:
// Check for operator IDs
operatorIds, err := storageAdapter.GetOperatorIds()
if err != nil || len(operatorIds) == 0 {
logger.InfoWithPrefix(logPrefix, "Waiting for operator IDs to be set...")
} else {
// Operator IDs are set
logger.InfoWithPrefix(logPrefix, "Operator IDs are set. Proceeding with initialization.")
return nil
}
time.Sleep(2 * time.Second) // Poll every 2 seconds
}
}
}

// handleShutdown manages graceful shutdown for services
func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerService, proxyService *services.ProxyAPIServerService) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-signalChan
logger.Info("Received shutdown signal. Initiating graceful shutdown...")
cancel() // Cancel context to signal all services to stop

// Give the HTTP server time to finish ongoing requests
serverCtx, serverCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer serverCancel()
if err := server.Shutdown(serverCtx); err != nil {
logger.Info("HTTP server Shutdown: %v", err)
}
logger.InfoWithPrefix(logPrefix, "Received shutdown signal. Initiating graceful shutdown...")
cancel()

// Give the Proxy API server time to finish ongoing requests
proxyServerCtx, proxyServerCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer proxyServerCancel()
if err := proxyServer.Shutdown(proxyServerCtx); err != nil {
logger.Info("Proxy API server Shutdown: %v", err)
}
}()
// Shutdown API services with a timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

// Wait for all goroutines to finish
wg.Wait()
logger.Info("All services stopped. Shutting down application.")
apiService.Shutdown(shutdownCtx)
proxyService.Shutdown(shutdownCtx)
}()
}
14 changes: 11 additions & 3 deletions internal/adapters/api/api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type APIHandler struct {
adapterPrefix string
}

// Ensure APIHandler implements the ports.API interface
var _ ports.API = (*APIHandler)(nil)

// GetRouter implements the ports.API interface
func (h *APIHandler) GetRouter() http.Handler {
return h.Router
}

// NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled
func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler {
h := &APIHandler{
Expand Down Expand Up @@ -135,7 +143,7 @@ func (h *APIHandler) DeleteOperator(w http.ResponseWriter, r *http.Request) {
return
}

// check it exists calling GetOperatorIds
// Check if operator ID exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand Down Expand Up @@ -191,7 +199,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {
return
}

// check if operator id already exists and if so return ok
// Check if operator ID already exists
operatorIds, err := h.StoragePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err)
Expand All @@ -215,7 +223,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) {

// Set last block processed to 0, this will trigger the events scanner to start from the beginning
// and look for events for the new operator ID
// TODO: this logic should be in the services layer
// TODO: Consider moving this logic to the services layer
if err := h.StoragePort.SaveDistributionLogLastProcessedBlock(0); err != nil {
logger.ErrorWithPrefix("API", "Failed to update DistributionLogLastProcessedBlock: %v", err)
writeErrorResponse(w, "Failed to reset DistributionLogLastProcessedBlock", http.StatusInternalServerError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func (csfa *CsFeeDistributorAdapter) WatchCsFeeDistributorEvents(ctx context.Con
select {
case event := <-distributionDataUpdatedChan:
handleDistributionDataUpdated(event)
return
// case err := <-sub.Err():
// // Subscription error should be handled by returning it to the service layer.
// return
case <-ctx.Done():
return
}
Expand Down
15 changes: 0 additions & 15 deletions internal/adapters/csModule/csmodule_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,49 +203,34 @@ func (csma *CsModuleAdapter) WatchCsModuleEvents(ctx context.Context, handlers p
select {
case event := <-depositedSigningKeysChangedChan:
handlers.HandleDepositedSigningKeysCountChanged(event)
return
case event := <-elRewardsStealingPenaltyReportedChan:
handlers.HandleElRewardsStealingPenaltyReported(event)
return
case event := <-elRewardsStealingPenaltySettledChan:
handlers.HandleElRewardsStealingPenaltySettled(event)
return
case event := <-elRewardsStealingPenaltyCancelledChan:
handlers.HandleElRewardsStealingPenaltyCancelled(event)
return
case event := <-initialSlashingSubmittedChan:
handlers.HandleInitialSlashingSubmitted(event)
return
case event := <-keyRemovalChargeAppliedChan:
handlers.HandleKeyRemovalChargeApplied(event)
return
case event := <-nodeOperatorManagerAddressChangeProposedChan:
handlers.HandleNodeOperatorManagerAddressChangeProposed(event)
return
case event := <-nodeOperatorManagerAddressChangedChan:
handlers.HandleNodeOperatorManagerAddressChanged(event)
return
case event := <-nodeOperatorRewardAddressChangeProposedChan:
handlers.HandleNodeOperatorRewardAddressChangeProposed(event)
return
case event := <-nodeOperatorRewardAddressChangedChan:
handlers.HandleNodeOperatorRewardAddressChanged(event)
return
case event := <-stuckSigningKeysCountChangedChan:
handlers.HandleStuckSigningKeysCountChanged(event)
return
case event := <-vettedSigningKeysCountDecreasedChan:
handlers.HandleVettedSigningKeysCountDecreased(event)
return
case event := <-withdrawalSubmittedChan:
handlers.HandleWithdrawalSubmitted(event)
return
case event := <-totalSigningKeysCountChangedChan:
handlers.HandleTotalSigningKeysCountChanged(event)
return
case event := <-publicReleaseChan:
handlers.HandlePublicRelease(event)
return

case <-ctx.Done():
return
Expand Down
10 changes: 10 additions & 0 deletions internal/adapters/proxyApi/proxy_api_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"net/url"

"lido-events/internal/application/ports"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)
Expand All @@ -17,6 +19,14 @@ type APIHandler struct {
adapterPrefix string
}

// Ensure APIHandler implements the ports.ProxyAPI interface
var _ ports.ProxyAPI = (*APIHandler)(nil)

// GetRouter implements the ports.ProxyAPI interface
func (h *APIHandler) GetRouter() http.Handler {
return h.Router
}

// NewProxyAPIAdapter initializes the APIHandler and sets up routes
func NewProxyAPIAdapter(allowedOrigins []string, proxyApiURL string) *APIHandler {
h := &APIHandler{
Expand Down
Loading
Loading