From bfe201bc9ecde26b6594be77c7a2ec0804486219 Mon Sep 17 00:00:00 2001 From: pablomendezroyo Date: Thu, 5 Dec 2024 09:51:41 +0100 Subject: [PATCH 1/2] Simplify main logic --- cmd/main.go | 192 ++++++------------ internal/adapters/api/api_adapter.go | 14 +- .../adapters/proxyApi/proxy_api_adapter.go | 10 + internal/application/ports/api_port.go | 8 + internal/application/ports/proxy_api_port.go | 8 + internal/application/services/api_server.go | 43 ++++ .../application/services/proxy_api_server.go | 43 ++++ 7 files changed, 190 insertions(+), 128 deletions(-) create mode 100644 internal/application/ports/api_port.go create mode 100644 internal/application/ports/proxy_api_port.go create mode 100644 internal/application/services/api_server.go create mode 100644 internal/application/services/proxy_api_server.go diff --git a/cmd/main.go b/cmd/main.go index 32535a9..af6d1e9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,12 @@ package main import ( "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + "lido-events/internal/adapters/api" "lido-events/internal/adapters/beaconchain" csfeedistributor "lido-events/internal/adapters/csFeeDistributor" @@ -14,50 +20,17 @@ import ( proxyapi "lido-events/internal/adapters/proxyApi" "lido-events/internal/adapters/storage" "lido-events/internal/adapters/vebo" - "lido-events/internal/logger" - "os" - "os/signal" - "strconv" - "sync" - "syscall" - "time" - "lido-events/internal/application/services" "lido-events/internal/config" - "net/http" + "lido-events/internal/logger" ) -// Helper function to check if operator IDs and Telegram config are available -func waitForInitialConfig(ctx context.Context, storageAdapter *storage.Storage) error { - for { - select { - case <-ctx.Done(): // Exit if the context is canceled - return ctx.Err() - default: - // Check for operator IDs - operatorIds, err := storageAdapter.GetOperatorIds() - if err != nil || len(operatorIds) == 0 { - logger.Info("Waiting for operator IDs to be set...") - } else { - // Operator IDs are set - logger.Info("Operator IDs are set. Proceeding with initialization.") - return nil - } - time.Sleep(2 * time.Second) // Poll every 2 seconds - } - } -} - func main() { - // Set up context with cancellation and a WaitGroup for graceful shutdown + // Set up context with cancellation and WaitGroup for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup - // Set up signal channel to handle OS interrupts - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - // Load configurations networkConfig, err := config.LoadNetworkConfig() if err != nil { @@ -67,132 +40,101 @@ func main() { // Initialize adapters storageAdapter := storage.NewStorageAdapter() - // Initialize the notifier adapter (Telegram configuration optional) notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter) if err != nil { logger.Warn("Telegram notifier not initialized: %v", err) } - // Start HTTP server apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS) - server := &http.Server{ - Addr: ":" + strconv.FormatUint(networkConfig.ApiPort, 10), - Handler: apiAdapter.Router, - } - wg.Add(1) - go func() { - defer wg.Done() - logger.Info("Server started on :%d", networkConfig.ApiPort) - if err := server.ListenAndServe(); err != http.ErrServerClosed { - logger.Fatal("HTTP server ListenAndServe: %v", err) - } - }() - - // Start Proxy API server proxyApiAdapter := proxyapi.NewProxyAPIAdapter(networkConfig.CORS, networkConfig.LidoKeysApiUrl) - proxyServer := &http.Server{ - Addr: ":" + strconv.FormatUint(networkConfig.ProxyApiPort, 10), - Handler: proxyApiAdapter.Router, - } - wg.Add(1) - go func() { - defer wg.Done() - logger.Info("Proxy API server started on :%d", networkConfig.ProxyApiPort) - if err := proxyServer.ListenAndServe(); err != http.ErrServerClosed { - logger.Fatal("Proxy API server ListenAndServe: %v", err) - } - }() - // Wait for initial configuration in a separate goroutine - configReady := make(chan error, 1) - go func() { - configReady <- waitForInitialConfig(ctx, storageAdapter) - }() + // Initialize API services + apiService := services.NewAPIServerService(apiAdapter, networkConfig.ApiPort) + proxyService := services.NewProxyAPIServerService(proxyApiAdapter, networkConfig.ProxyApiPort) - // Start listening for signals in a separate goroutine - go func() { - <-signalChan - logger.Info("Received shutdown signal. Initiating graceful shutdown...") - cancel() // Cancel context to stop all services - }() + // Start API services + apiService.Start(&wg) + proxyService.Start(&wg) - // Wait for either the config to be ready or the context to be canceled - select { - case err := <-configReady: - if err != nil { - logger.Warn("Shutting down due to: %v", err) - return - } - logger.Info("Configuration is ready. Proceeding with initialization.") - case <-ctx.Done(): - logger.Info("Context canceled before configuration was ready.") + // Wait for and validate initial configuration + if err := waitForConfig(ctx, storageAdapter); err != nil { + logger.Warn("Application shutting down due to configuration validation failure: %v", err) return } + // Initialize domain adapters ipfsAdapter := ipfs.NewIPFSAdapter(networkConfig.IpfsUrl) beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL) executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl) exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl) + csFeeDistributorImplAdapter, _ := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) + veboAdapter, _ := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter) + csModuleAdapter, _ := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter) + csFeeDistributorAdapter, _ := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) - csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) - if err != nil { - logger.Fatal("Failed to initialize CsFeeDistributorImpl adapter: %v", err) - } - veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter) - if err != nil { - logger.Fatal("Failed to initialize Vebo adapter: %v", err) - } - csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter) - if err != nil { - logger.Fatal("Failed to initialize CsModule adapter: %v", err) - } - csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) - if err != nil { - logger.Fatal("Failed to initialize CsFeeDistributor adapter: %v", err) - } - - // Initialize services + // Initialize domain services eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter) distributionLogUpdatedScannerService := services.NewDistributionLogUpdatedEventScanner(storageAdapter, notifierAdapter, executionAdapter, csFeeDistributorImplAdapter, networkConfig.CsFeeDistributorBlockDeployment) validatorExitRequestScannerService := services.NewValidatorExitRequestEventScanner(storageAdapter, notifierAdapter, veboAdapter, executionAdapter, beaconchainAdapter, networkConfig.VeboBlockDeployment) validatorEjectorService := services.NewValidatorEjectorService(storageAdapter, notifierAdapter, exitValidatorAdapter, beaconchainAdapter) pendingHashesLoaderService := services.NewPendingHashesLoader(storageAdapter, ipfsAdapter) - // DistributionLogUpdated + // Start domain services distributionLogUpdatedExecutionComplete := make(chan struct{}) - go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) // once every epoch + go distributionLogUpdatedScannerService.ScanDistributionLogUpdatedEventsCron(ctx, 384*time.Second, &wg, distributionLogUpdatedExecutionComplete) go pendingHashesLoaderService.LoadPendingHashesCron(ctx, 3*time.Hour, &wg, distributionLogUpdatedExecutionComplete) - // ExitRequest exitRequestExecutionComplete := make(chan struct{}) - go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) // once every epoch + go validatorExitRequestScannerService.ScanValidatorExitRequestEventsCron(ctx, 384*time.Second, &wg, exitRequestExecutionComplete) go validatorEjectorService.ValidatorEjectorCron(ctx, 64*time.Minute, &wg, exitRequestExecutionComplete) - // Events watcher go eventsWatcherService.WatchAllEvents(ctx, &wg) - // Handle shutdown signals + // Handle OS signals for shutdown + handleShutdown(cancel, apiService, proxyService) + + // Wait for all goroutines to finish + wg.Wait() + logger.Info("All services stopped. Shutting down application.") +} + +// Helper function to check if operator IDs and Telegram config are available +func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error { + for { + select { + case <-ctx.Done(): // Exit if the context is canceled + logger.Info("Context canceled before configuration was ready.") + return ctx.Err() + default: + // Check for operator IDs + operatorIds, err := storageAdapter.GetOperatorIds() + if err != nil || len(operatorIds) == 0 { + logger.Info("Waiting for operator IDs to be set...") + } else { + // Operator IDs are set + logger.Info("Operator IDs are set. Proceeding with initialization.") + return nil + } + time.Sleep(2 * time.Second) // Poll every 2 seconds + } + } +} + +// handleShutdown manages graceful shutdown for services +func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerService, proxyService *services.ProxyAPIServerService) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + go func() { <-signalChan logger.Info("Received shutdown signal. Initiating graceful shutdown...") - cancel() // Cancel context to signal all services to stop + cancel() - // Give the HTTP server time to finish ongoing requests - serverCtx, serverCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer serverCancel() - if err := server.Shutdown(serverCtx); err != nil { - logger.Info("HTTP server Shutdown: %v", err) - } + // Shutdown API services with a timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() - // Give the Proxy API server time to finish ongoing requests - proxyServerCtx, proxyServerCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer proxyServerCancel() - if err := proxyServer.Shutdown(proxyServerCtx); err != nil { - logger.Info("Proxy API server Shutdown: %v", err) - } + apiService.Shutdown(shutdownCtx) + proxyService.Shutdown(shutdownCtx) }() - - // Wait for all goroutines to finish - wg.Wait() - logger.Info("All services stopped. Shutting down application.") } diff --git a/internal/adapters/api/api_adapter.go b/internal/adapters/api/api_adapter.go index 9d2dfd6..812978a 100644 --- a/internal/adapters/api/api_adapter.go +++ b/internal/adapters/api/api_adapter.go @@ -20,6 +20,14 @@ type APIHandler struct { adapterPrefix string } +// Ensure APIHandler implements the ports.API interface +var _ ports.API = (*APIHandler)(nil) + +// GetRouter implements the ports.API interface +func (h *APIHandler) GetRouter() http.Handler { + return h.Router +} + // NewAPIAdapter initializes the APIHandler and sets up routes with CORS enabled func NewAPIAdapter(storagePort ports.StoragePort, allowedOrigins []string) *APIHandler { h := &APIHandler{ @@ -135,7 +143,7 @@ func (h *APIHandler) DeleteOperator(w http.ResponseWriter, r *http.Request) { return } - // check it exists calling GetOperatorIds + // Check if operator ID exists operatorIds, err := h.StoragePort.GetOperatorIds() if err != nil { logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err) @@ -191,7 +199,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) { return } - // check if operator id already exists and if so return ok + // Check if operator ID already exists operatorIds, err := h.StoragePort.GetOperatorIds() if err != nil { logger.ErrorWithPrefix("API", "Failed to fetch operator IDs: %v", err) @@ -215,7 +223,7 @@ func (h *APIHandler) AddOperator(w http.ResponseWriter, r *http.Request) { // Set last block processed to 0, this will trigger the events scanner to start from the beginning // and look for events for the new operator ID - // TODO: this logic should be in the services layer + // TODO: Consider moving this logic to the services layer if err := h.StoragePort.SaveDistributionLogLastProcessedBlock(0); err != nil { logger.ErrorWithPrefix("API", "Failed to update DistributionLogLastProcessedBlock: %v", err) writeErrorResponse(w, "Failed to reset DistributionLogLastProcessedBlock", http.StatusInternalServerError) diff --git a/internal/adapters/proxyApi/proxy_api_adapter.go b/internal/adapters/proxyApi/proxy_api_adapter.go index 8797c02..9e83cbc 100644 --- a/internal/adapters/proxyApi/proxy_api_adapter.go +++ b/internal/adapters/proxyApi/proxy_api_adapter.go @@ -6,6 +6,8 @@ import ( "net/http" "net/url" + "lido-events/internal/application/ports" + "github.com/gorilla/handlers" "github.com/gorilla/mux" ) @@ -17,6 +19,14 @@ type APIHandler struct { adapterPrefix string } +// Ensure APIHandler implements the ports.ProxyAPI interface +var _ ports.ProxyAPI = (*APIHandler)(nil) + +// GetRouter implements the ports.ProxyAPI interface +func (h *APIHandler) GetRouter() http.Handler { + return h.Router +} + // NewProxyAPIAdapter initializes the APIHandler and sets up routes func NewProxyAPIAdapter(allowedOrigins []string, proxyApiURL string) *APIHandler { h := &APIHandler{ diff --git a/internal/application/ports/api_port.go b/internal/application/ports/api_port.go new file mode 100644 index 0000000..a7c9301 --- /dev/null +++ b/internal/application/ports/api_port.go @@ -0,0 +1,8 @@ +package ports + +import "net/http" + +// API is the interface for the API adapter. +type API interface { + GetRouter() http.Handler +} diff --git a/internal/application/ports/proxy_api_port.go b/internal/application/ports/proxy_api_port.go new file mode 100644 index 0000000..f6b5d27 --- /dev/null +++ b/internal/application/ports/proxy_api_port.go @@ -0,0 +1,8 @@ +package ports + +import "net/http" + +// ProxyAPI is the interface for the Proxy API adapter. +type ProxyAPI interface { + GetRouter() http.Handler +} diff --git a/internal/application/services/api_server.go b/internal/application/services/api_server.go new file mode 100644 index 0000000..1fe12f8 --- /dev/null +++ b/internal/application/services/api_server.go @@ -0,0 +1,43 @@ +// internal/application/services/api_server_service.go +package services + +import ( + "context" + "lido-events/internal/application/ports" + "lido-events/internal/logger" + "net/http" + "strconv" + "sync" +) + +type APIServerService struct { + server *http.Server + servicePrefix string +} + +func NewAPIServerService(apiAdapter ports.API, port uint64) *APIServerService { + return &APIServerService{ + server: &http.Server{ + Addr: ":" + strconv.FormatUint(port, 10), + Handler: apiAdapter.GetRouter(), + }, + servicePrefix: "API", + } +} + +func (s *APIServerService) Start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + logger.InfoWithPrefix(s.servicePrefix, "server started on %s", s.server.Addr) + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + logger.FatalWithPrefix(s.servicePrefix, "server ListenAndServe: %v", err) + } + }() +} + +func (s *APIServerService) Shutdown(ctx context.Context) { + if err := s.server.Shutdown(ctx); err != nil { + logger.WarnWithPrefix(s.servicePrefix, "server Shutdown: %v", err) + } +} diff --git a/internal/application/services/proxy_api_server.go b/internal/application/services/proxy_api_server.go new file mode 100644 index 0000000..42ecfe9 --- /dev/null +++ b/internal/application/services/proxy_api_server.go @@ -0,0 +1,43 @@ +// internal/application/services/proxy_api_server_service.go +package services + +import ( + "context" + "lido-events/internal/application/ports" + "lido-events/internal/logger" + "net/http" + "strconv" + "sync" +) + +type ProxyAPIServerService struct { + server *http.Server + servicePrefix string +} + +func NewProxyAPIServerService(proxyApiAdapter ports.ProxyAPI, port uint64) *ProxyAPIServerService { + return &ProxyAPIServerService{ + server: &http.Server{ + Addr: ":" + strconv.FormatUint(port, 10), + Handler: proxyApiAdapter.GetRouter(), + }, + servicePrefix: "Proxy API", + } +} + +func (s *ProxyAPIServerService) Start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + logger.InfoWithPrefix(s.servicePrefix, "server started on %s", s.server.Addr) + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + logger.FatalWithPrefix(s.servicePrefix, "server ListenAndServe: %v", err) + } + }() +} + +func (s *ProxyAPIServerService) Shutdown(ctx context.Context) { + if err := s.server.Shutdown(ctx); err != nil { + logger.WarnWithPrefix(s.servicePrefix, "server Shutdown: %v", err) + } +} From f197757d4fbea87a1270a127322fa106dabf1894 Mon Sep 17 00:00:00 2001 From: pablomendezroyo Date: Thu, 5 Dec 2024 10:54:55 +0100 Subject: [PATCH 2/2] add main prefix --- cmd/main.go | 41 ++++++++++++------- .../csfeedistributor_adapter.go | 4 -- .../adapters/csModule/csmodule_adapter.go | 15 ------- internal/adapters/vebo/vebo_adapter.go | 4 -- 4 files changed, 27 insertions(+), 37 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index af6d1e9..d883d57 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,6 +25,8 @@ import ( "lido-events/internal/logger" ) +var logPrefix = "MAIN" + func main() { // Set up context with cancellation and WaitGroup for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) @@ -34,15 +36,15 @@ func main() { // Load configurations networkConfig, err := config.LoadNetworkConfig() if err != nil { - logger.Fatal("Failed to load network configuration: %v", err) + logger.FatalWithPrefix(logPrefix, "Failed to load network configuration: %v", err) } - logger.Debug("Network config: %+v", networkConfig) + logger.DebugWithPrefix(logPrefix, "Network config: %+v", networkConfig) // Initialize adapters storageAdapter := storage.NewStorageAdapter() notifierAdapter, err := notifier.NewNotifierAdapter(ctx, storageAdapter) if err != nil { - logger.Warn("Telegram notifier not initialized: %v", err) + logger.WarnWithPrefix(logPrefix, "Telegram notifier not initialized: %v", err) } apiAdapter := api.NewAPIAdapter(storageAdapter, networkConfig.CORS) @@ -58,8 +60,7 @@ func main() { // Wait for and validate initial configuration if err := waitForConfig(ctx, storageAdapter); err != nil { - logger.Warn("Application shutting down due to configuration validation failure: %v", err) - return + logger.FatalWithPrefix(logPrefix, "Application shutting down due to configuration validation failure: %v", err) } // Initialize domain adapters @@ -67,10 +68,22 @@ func main() { beaconchainAdapter := beaconchain.NewBeaconchainAdapter(networkConfig.BeaconchainURL) executionAdapter := execution.NewExecutionAdapter(networkConfig.RpcUrl) exitValidatorAdapter := exitvalidator.NewExitValidatorAdapter(beaconchainAdapter, networkConfig.SignerUrl) - csFeeDistributorImplAdapter, _ := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) - veboAdapter, _ := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter) - csModuleAdapter, _ := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter) - csFeeDistributorAdapter, _ := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) + csFeeDistributorImplAdapter, err := csfeedistributorimpl.NewCsFeeDistributorImplAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) + if err != nil { + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorImplAdapter: %v", err) + } + veboAdapter, err := vebo.NewVeboAdapter(networkConfig.WsURL, networkConfig.VEBOAddress, storageAdapter) + if err != nil { + logger.FatalWithPrefix(logPrefix, "Failed to initialize VeboAdapter: %v", err) + } + csModuleAdapter, err := csmodule.NewCsModuleAdapter(networkConfig.WsURL, networkConfig.CSModuleAddress, storageAdapter) + if err != nil { + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsModuleAdapter: %v", err) + } + csFeeDistributorAdapter, err := csfeedistributor.NewCsFeeDistributorAdapter(networkConfig.WsURL, networkConfig.CSFeeDistributorAddress) + if err != nil { + logger.FatalWithPrefix(logPrefix, "Failed to initialize CsFeeDistributorAdapter: %v", err) + } // Initialize domain services eventsWatcherService := services.NewEventsWatcherService(veboAdapter, csModuleAdapter, csFeeDistributorAdapter, notifierAdapter) @@ -95,7 +108,7 @@ func main() { // Wait for all goroutines to finish wg.Wait() - logger.Info("All services stopped. Shutting down application.") + logger.InfoWithPrefix(logPrefix, "All services stopped. Shutting down application.") } // Helper function to check if operator IDs and Telegram config are available @@ -103,16 +116,16 @@ func waitForConfig(ctx context.Context, storageAdapter *storage.Storage) error { for { select { case <-ctx.Done(): // Exit if the context is canceled - logger.Info("Context canceled before configuration was ready.") + logger.InfoWithPrefix(logPrefix, "Context canceled before configuration was ready.") return ctx.Err() default: // Check for operator IDs operatorIds, err := storageAdapter.GetOperatorIds() if err != nil || len(operatorIds) == 0 { - logger.Info("Waiting for operator IDs to be set...") + logger.InfoWithPrefix(logPrefix, "Waiting for operator IDs to be set...") } else { // Operator IDs are set - logger.Info("Operator IDs are set. Proceeding with initialization.") + logger.InfoWithPrefix(logPrefix, "Operator IDs are set. Proceeding with initialization.") return nil } time.Sleep(2 * time.Second) // Poll every 2 seconds @@ -127,7 +140,7 @@ func handleShutdown(cancel context.CancelFunc, apiService *services.APIServerSer go func() { <-signalChan - logger.Info("Received shutdown signal. Initiating graceful shutdown...") + logger.InfoWithPrefix(logPrefix, "Received shutdown signal. Initiating graceful shutdown...") cancel() // Shutdown API services with a timeout diff --git a/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go b/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go index 8795675..a8aa032 100644 --- a/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go +++ b/internal/adapters/csFeeDistributor/csfeedistributor_adapter.go @@ -50,10 +50,6 @@ func (csfa *CsFeeDistributorAdapter) WatchCsFeeDistributorEvents(ctx context.Con select { case event := <-distributionDataUpdatedChan: handleDistributionDataUpdated(event) - return - // case err := <-sub.Err(): - // // Subscription error should be handled by returning it to the service layer. - // return case <-ctx.Done(): return } diff --git a/internal/adapters/csModule/csmodule_adapter.go b/internal/adapters/csModule/csmodule_adapter.go index 57a965f..471eb61 100644 --- a/internal/adapters/csModule/csmodule_adapter.go +++ b/internal/adapters/csModule/csmodule_adapter.go @@ -203,49 +203,34 @@ func (csma *CsModuleAdapter) WatchCsModuleEvents(ctx context.Context, handlers p select { case event := <-depositedSigningKeysChangedChan: handlers.HandleDepositedSigningKeysCountChanged(event) - return case event := <-elRewardsStealingPenaltyReportedChan: handlers.HandleElRewardsStealingPenaltyReported(event) - return case event := <-elRewardsStealingPenaltySettledChan: handlers.HandleElRewardsStealingPenaltySettled(event) - return case event := <-elRewardsStealingPenaltyCancelledChan: handlers.HandleElRewardsStealingPenaltyCancelled(event) - return case event := <-initialSlashingSubmittedChan: handlers.HandleInitialSlashingSubmitted(event) - return case event := <-keyRemovalChargeAppliedChan: handlers.HandleKeyRemovalChargeApplied(event) - return case event := <-nodeOperatorManagerAddressChangeProposedChan: handlers.HandleNodeOperatorManagerAddressChangeProposed(event) - return case event := <-nodeOperatorManagerAddressChangedChan: handlers.HandleNodeOperatorManagerAddressChanged(event) - return case event := <-nodeOperatorRewardAddressChangeProposedChan: handlers.HandleNodeOperatorRewardAddressChangeProposed(event) - return case event := <-nodeOperatorRewardAddressChangedChan: handlers.HandleNodeOperatorRewardAddressChanged(event) - return case event := <-stuckSigningKeysCountChangedChan: handlers.HandleStuckSigningKeysCountChanged(event) - return case event := <-vettedSigningKeysCountDecreasedChan: handlers.HandleVettedSigningKeysCountDecreased(event) - return case event := <-withdrawalSubmittedChan: handlers.HandleWithdrawalSubmitted(event) - return case event := <-totalSigningKeysCountChangedChan: handlers.HandleTotalSigningKeysCountChanged(event) - return case event := <-publicReleaseChan: handlers.HandlePublicRelease(event) - return case <-ctx.Done(): return diff --git a/internal/adapters/vebo/vebo_adapter.go b/internal/adapters/vebo/vebo_adapter.go index c6e07ab..8e6be03 100644 --- a/internal/adapters/vebo/vebo_adapter.go +++ b/internal/adapters/vebo/vebo_adapter.go @@ -88,10 +88,6 @@ func (va *VeboAdapter) WatchReportSubmittedEvents(ctx context.Context, handleRep select { case event := <-reportSubmittedChan: handleReportSubmittedEvent(event) - return - // case err := <-subReport.Err(): - // // Exit on subscription error - // return case <-ctx.Done(): return }