Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load pending hashes on getPeroformance request #117

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
// relaysCheckerService := services.NewRelayCronService(relaysAllowedAdapter, relaysUsedAdapter, notifierAdapter)

// Initialize API services
apiService := services.NewAPIServerService(ctx, networkConfig.ApiPort, storageAdapter, notifierAdapter, relaysUsedAdapter, relaysAllowedAdapter, csModuleEventsScannerService, distributionLogUpdatedScannerService, validatorExitRequestScannerService, networkConfig.CORS)
apiService := services.NewAPIServerService(ctx, networkConfig.ApiPort, storageAdapter, notifierAdapter, relaysUsedAdapter, relaysAllowedAdapter, csModuleEventsScannerService, distributionLogUpdatedScannerService, validatorExitRequestScannerService, pendingHashesLoaderService, networkConfig.CORS)
proxyService := services.NewProxyAPIServerService(networkConfig.ProxyApiPort, networkConfig.LidoKeysApiUrl, networkConfig.CORS)

// Start API services
Expand Down
118 changes: 63 additions & 55 deletions internal/application/services/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ type APIServerService struct {
server *http.Server
servicePrefix string
ctx context.Context
StoragePort ports.StoragePort
NotifierPort ports.NotifierPort
RelaysUsedPort ports.RelaysUsedPort
RelaysAllowedPort ports.RelaysAllowedPort
CsModuleEventsScanner *CsModuleEventsScanner
DistributionLogUpdatedEventScanner *DistributionLogUpdatedEventScanner
ValidatorExitRequestEventScanner *ValidatorExitRequestEventScanner
Router *mux.Router
storagePort ports.StoragePort
notifierPort ports.NotifierPort
relaysUsedPort ports.RelaysUsedPort
relaysAllowedPort ports.RelaysAllowedPort
csModuleEventsScanner *CsModuleEventsScanner
distributionLogUpdatedEventScanner *DistributionLogUpdatedEventScanner
validatorExitRequestEventScanner *ValidatorExitRequestEventScanner
pendingHashesLoader *PendingHashesLoader
router *mux.Router
processingAddresses sync.Map // To track NO events being processed by address
processingWithdrawals sync.Map // To track withdrawals being processed by operator ID
processingExitRequests sync.Map // To track exit requests being processed by operator ID
Expand All @@ -38,32 +39,32 @@ type APIServerService struct {
}

// NewAPIServerService initializes the API server
func NewAPIServerService(ctx context.Context, port uint64, storagePort ports.StoragePort, notifierPort ports.NotifierPort, relaysUsedPort ports.RelaysUsedPort, relaysAllowedPort ports.RelaysAllowedPort, CsModuleEventsScanner *CsModuleEventsScanner, DistributionLogUpdatedEventScanner *DistributionLogUpdatedEventScanner, ValidatorExitRequestEventScanner *ValidatorExitRequestEventScanner, allowedOrigins []string) *APIServerService {
router := mux.NewRouter()
func NewAPIServerService(ctx context.Context, port uint64, storagePort ports.StoragePort, notifierPort ports.NotifierPort, relaysUsedPort ports.RelaysUsedPort, relaysAllowedPort ports.RelaysAllowedPort, csModuleEventsScanner *CsModuleEventsScanner, distributionLogUpdatedEventScanner *DistributionLogUpdatedEventScanner, validatorExitRequestEventScanner *ValidatorExitRequestEventScanner, pendingHashesLoader *PendingHashesLoader, allowedOrigins []string) *APIServerService {
apiServer := &APIServerService{
server: &http.Server{Addr: ":" + strconv.FormatUint(port, 10)},
servicePrefix: "API",
ctx: ctx,
StoragePort: storagePort,
NotifierPort: notifierPort,
RelaysUsedPort: relaysUsedPort,
RelaysAllowedPort: relaysAllowedPort,
CsModuleEventsScanner: CsModuleEventsScanner,
DistributionLogUpdatedEventScanner: DistributionLogUpdatedEventScanner,
ValidatorExitRequestEventScanner: ValidatorExitRequestEventScanner,
Router: router,
storagePort: storagePort,
notifierPort: notifierPort,
relaysUsedPort: relaysUsedPort,
relaysAllowedPort: relaysAllowedPort,
csModuleEventsScanner: csModuleEventsScanner,
distributionLogUpdatedEventScanner: distributionLogUpdatedEventScanner,
validatorExitRequestEventScanner: validatorExitRequestEventScanner,
pendingHashesLoader: pendingHashesLoader,
router: mux.NewRouter(),
}

apiServer.SetupRoutes()
apiServer.server.Handler = apiServer.Router
apiServer.server.Handler = apiServer.router

// Define CORS configuration
corsAllowedOrigins := handlers.AllowedOrigins(allowedOrigins)
corsAllowedMethods := handlers.AllowedMethods([]string{"GET", "POST", "OPTIONS", "DELETE"})
corsAllowedHeaders := handlers.AllowedHeaders([]string{"Content-Type", "Authorization"})

// Add CORS middleware globally
apiServer.Router.Use(handlers.CORS(
apiServer.router.Use(handlers.CORS(
corsAllowedOrigins,
corsAllowedMethods,
corsAllowedHeaders,
Expand All @@ -88,21 +89,21 @@ func (s *APIServerService) Shutdown(ctx context.Context) {

// SetupRoutes defines API endpoints and routes
func (h *APIServerService) SetupRoutes() {
h.Router.HandleFunc("/api/v0/events_indexer/telegramConfig", h.updateTelegramConfig).Methods("POST", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/telegramConfig", h.getTelegramConfig).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/operatorId", h.addOperator).Methods("POST", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/operatorId", h.deleteOperator).Methods("DELETE", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/operator_performance", h.getOperatorPerformance).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/exit_requests", h.getExitRequests).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/relays_allowed", h.getRelaysAllowed).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/relays_used", h.getRelaysUsed).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/address_events", h.getAddressEvents).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/withdrawals_submitted", h.getWithdrawalsSubmitted).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/el_rewards_stealing_penalties_reported", h.getElRewardsStealingPenaltiesReported).Methods("GET", "OPTIONS")
h.Router.HandleFunc("/api/v0/events_indexer/pending_hashes", h.getPendingHashes).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/telegramConfig", h.updateTelegramConfig).Methods("POST", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/telegramConfig", h.getTelegramConfig).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/operatorId", h.addOperator).Methods("POST", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/operatorId", h.deleteOperator).Methods("DELETE", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/operator_performance", h.getOperatorPerformance).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/exit_requests", h.getExitRequests).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/relays_allowed", h.getRelaysAllowed).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/relays_used", h.getRelaysUsed).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/address_events", h.getAddressEvents).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/withdrawals_submitted", h.getWithdrawalsSubmitted).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/el_rewards_stealing_penalties_reported", h.getElRewardsStealingPenaltiesReported).Methods("GET", "OPTIONS")
h.router.HandleFunc("/api/v0/events_indexer/pending_hashes", h.getPendingHashes).Methods("GET", "OPTIONS")

// Add a generic OPTIONS handler to ensure preflight requests are handled
h.Router.Methods("OPTIONS").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h.router.Methods("OPTIONS").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) // Respond to OPTIONS requests with 200 OK
})
}
Expand All @@ -128,7 +129,7 @@ func (h *APIServerService) getPendingHashes(w http.ResponseWriter, r *http.Reque
}

// Check if the operator ID exists
operatorIds, err := h.StoragePort.GetOperatorIds()
operatorIds, err := h.storagePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching operator IDs: %v", err)
writeErrorResponse(w, "Error fetching operator IDs", http.StatusInternalServerError, err)
Expand All @@ -149,7 +150,7 @@ func (h *APIServerService) getPendingHashes(w http.ResponseWriter, r *http.Reque
return
}

pendingHashes, err := h.StoragePort.GetPendingHashes(operatorIdNum)
pendingHashes, err := h.storagePort.GetPendingHashes(operatorIdNum)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching pending hashes: %v", err)
writeErrorResponse(w, "Error fetching pending hashes", http.StatusInternalServerError, err)
Expand Down Expand Up @@ -187,13 +188,13 @@ func (h *APIServerService) getElRewardsStealingPenaltiesReported(w http.Response
}()

// Perform the scanning (synchronously)
if err := h.CsModuleEventsScanner.ScanElRewardsStealingPenaltyReported(h.ctx); err != nil {
if err := h.csModuleEventsScanner.ScanElRewardsStealingPenaltyReported(h.ctx); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error scanning EL rewards stealing penalties reported: %v", err)
writeErrorResponse(w, "Error scanning EL rewards stealing penalties reported", http.StatusInternalServerError, err)
return
}

penalties, err := h.StoragePort.GetElRewardsStealingPenaltiesReported()
penalties, err := h.storagePort.GetElRewardsStealingPenaltiesReported()
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching EL rewards stealing penalties reported: %v", err)
writeErrorResponse(w, "Error fetching EL rewards stealing penalties reported", http.StatusInternalServerError, err)
Expand Down Expand Up @@ -243,13 +244,13 @@ func (h *APIServerService) getWithdrawalsSubmitted(w http.ResponseWriter, r *htt
defer h.processingWithdrawals.Delete(operatorIdNum.String()) // Clear operator ID when done

// Perform the scanning (synchronously)
if err := h.CsModuleEventsScanner.ScanWithdrawalsSubmittedEvents(h.ctx, operatorIdNum); err != nil {
if err := h.csModuleEventsScanner.ScanWithdrawalsSubmittedEvents(h.ctx, operatorIdNum); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error scanning withdrawals submitted: %v", err)
writeErrorResponse(w, "Error scanning withdrawals submitted", http.StatusInternalServerError, err)
return
}

withdrawals, err := h.StoragePort.GetWithdrawals(operatorIdNum)
withdrawals, err := h.storagePort.GetWithdrawals(operatorIdNum)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching withdrawals submitted: %v", err)
writeErrorResponse(w, "Error fetching withdrawals submitted", http.StatusInternalServerError, err)
Expand Down Expand Up @@ -299,14 +300,14 @@ func (h *APIServerService) getAddressEvents(w http.ResponseWriter, r *http.Reque
defer h.processingAddresses.Delete(addressValidated.Hex()) // Clear address when done

// Perform the scanning (synchronously)
if err := h.CsModuleEventsScanner.ScanAddressEvents(h.ctx, addressValidated); err != nil {
if err := h.csModuleEventsScanner.ScanAddressEvents(h.ctx, addressValidated); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error scanning address events: %v", err)
writeErrorResponse(w, "Error scanning address events", http.StatusInternalServerError, err)
return
}

// Fetch the updated events for the address
addressEvents, err := h.StoragePort.GetAddressEvents(addressValidated)
addressEvents, err := h.storagePort.GetAddressEvents(addressValidated)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching address events: %v", err)
writeErrorResponse(w, "Error fetching address events", http.StatusInternalServerError, err)
Expand All @@ -329,7 +330,7 @@ func (h *APIServerService) getAddressEvents(w http.ResponseWriter, r *http.Reque
// getRelaysAllowed retrieves the list of allowed relays
func (h *APIServerService) getRelaysAllowed(w http.ResponseWriter, r *http.Request) {
logger.DebugWithPrefix(h.servicePrefix, "getRelaysAllowed request received")
relays, err := h.RelaysAllowedPort.GetRelaysAllowList(h.ctx)
relays, err := h.relaysAllowedPort.GetRelaysAllowList(h.ctx)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching allowed relays: %v", err)
writeErrorResponse(w, "Error fetching allowed relays", http.StatusInternalServerError, err)
Expand All @@ -350,7 +351,7 @@ func (h *APIServerService) getRelaysAllowed(w http.ResponseWriter, r *http.Reque
// getRelaysUsed retrieves the list of used relays
func (h *APIServerService) getRelaysUsed(w http.ResponseWriter, r *http.Request) {
logger.DebugWithPrefix(h.servicePrefix, "getRelaysUsed request received")
relays, err := h.RelaysUsedPort.GetRelaysUsed(h.ctx)
relays, err := h.relaysUsedPort.GetRelaysUsed(h.ctx)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching used relays: %v", err)
writeErrorResponse(w, "Error fetching used relays", http.StatusInternalServerError, err)
Expand All @@ -371,7 +372,7 @@ func (h *APIServerService) getRelaysUsed(w http.ResponseWriter, r *http.Request)
// getTelegramConfig retrieves the Telegram configuration
func (h *APIServerService) getTelegramConfig(w http.ResponseWriter, r *http.Request) {
logger.DebugWithPrefix(h.servicePrefix, "getTelegramConfig request received")
config, err := h.StoragePort.GetTelegramConfig()
config, err := h.storagePort.GetTelegramConfig()
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching Telegram configuration: %v", err)
writeErrorResponse(w, "Error fetching Telegram configuration", http.StatusInternalServerError, err)
Expand Down Expand Up @@ -410,14 +411,14 @@ func (h *APIServerService) updateTelegramConfig(w http.ResponseWriter, r *http.R
}

// Update storage
if err := h.StoragePort.SaveTelegramConfig(domain.TelegramConfig(req)); err != nil {
if err := h.storagePort.SaveTelegramConfig(domain.TelegramConfig(req)); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to update Telegram configuration: %v", err)
writeErrorResponse(w, "Failed to update Telegram configuration", http.StatusInternalServerError, err)
return
}

// Synchronously update the Telegram bot configuration
if err := h.NotifierPort.UpdateBotConfig(); err != nil {
if err := h.notifierPort.UpdateBotConfig(); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to update Telegram bot configuration: %v", err)
writeErrorResponse(w, "Failed to update Telegram bot configuration", http.StatusInternalServerError, err)
return
Expand All @@ -444,7 +445,7 @@ func (h *APIServerService) deleteOperator(w http.ResponseWriter, r *http.Request
}

// Check if operator ID exists
operatorIds, err := h.StoragePort.GetOperatorIds()
operatorIds, err := h.storagePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to fetch operator IDs: %v", err)
writeErrorResponse(w, "Failed to fetch operator IDs", http.StatusInternalServerError, err)
Expand All @@ -465,7 +466,7 @@ func (h *APIServerService) deleteOperator(w http.ResponseWriter, r *http.Request
return
}

if err := h.StoragePort.DeleteOperator(operatorId); err != nil {
if err := h.storagePort.DeleteOperator(operatorId); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to delete Operator ID: %v", err)
writeErrorResponse(w, "Failed to delete Operator ID", http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -500,7 +501,7 @@ func (h *APIServerService) addOperator(w http.ResponseWriter, r *http.Request) {
}

// Check if operator ID already exists
operatorIds, err := h.StoragePort.GetOperatorIds()
operatorIds, err := h.storagePort.GetOperatorIds()
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to fetch operator IDs: %v", err)
writeErrorResponse(w, "Failed to fetch operator IDs", http.StatusInternalServerError, err)
Expand All @@ -515,7 +516,7 @@ func (h *APIServerService) addOperator(w http.ResponseWriter, r *http.Request) {
}
}

if err := h.StoragePort.SaveOperatorId(req.OperatorID); err != nil {
if err := h.storagePort.SaveOperatorId(req.OperatorID); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Failed to update Operator ID: %v", err)
writeErrorResponse(w, "Failed to update Operator ID", http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -555,15 +556,22 @@ func (h *APIServerService) getOperatorPerformance(w http.ResponseWriter, r *http
h.processingOperatorPerformance.Store(operatorIdNum.String(), struct{}{})
defer h.processingOperatorPerformance.Delete(operatorIdNum.String()) // Clear operator ID when done

// Perform the operator performance scan (synchronously)
if err := h.ValidatorExitRequestEventScanner.RunScan(h.ctx); err != nil {
// Perform distribution log scanning (synchronously)
if err := h.distributionLogUpdatedEventScanner.RunScan(h.ctx); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error scanning operator performance events: %v", err)
writeErrorResponse(w, "Error scanning operator performance events", http.StatusInternalServerError, err)
return
}

// Load the pending hashes
if err := h.pendingHashesLoader.LoadPendingHashes(); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error loading pending hashes: %v", err)
writeErrorResponse(w, "Error loading pending hashes", http.StatusInternalServerError, err)
return
}

// Fetch operator performance data
report, err := h.StoragePort.GetReports(operatorIdNum)
report, err := h.storagePort.GetReports(operatorIdNum)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching Lido report: %v", err)
writeErrorResponse(w, "Error fetching Lido report", http.StatusInternalServerError, err)
Expand Down Expand Up @@ -613,13 +621,13 @@ func (h *APIServerService) getExitRequests(w http.ResponseWriter, r *http.Reques
defer h.processingExitRequests.Delete(operatorIdNum.String()) // Clear operator ID when done

// Perform the scanning (synchronously)
if err := h.ValidatorExitRequestEventScanner.RunScan(h.ctx); err != nil {
if err := h.validatorExitRequestEventScanner.RunScan(h.ctx); err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error scanning ValidatorExitRequest events: %v", err)
writeErrorResponse(w, "Error scanning ValidatorExitRequest events", http.StatusInternalServerError, err)
return
}

exitRequests, err := h.StoragePort.GetExitRequests(operatorIdNum)
exitRequests, err := h.storagePort.GetExitRequests(operatorIdNum)
if err != nil {
logger.ErrorWithPrefix(h.servicePrefix, "Error fetching exit requests: %v", err)
writeErrorResponse(w, "Error fetching exit requests", http.StatusInternalServerError, err)
Expand Down
6 changes: 6 additions & 0 deletions internal/application/services/loadPendingHashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type PendingHashesLoader struct {
notifierPort ports.NotifierPort
ipfsPort ports.IpfsPort
minGenesisTime uint64
mu sync.Mutex
servicePrefix string
}

Expand All @@ -26,6 +27,7 @@ func NewPendingHashesLoader(storagePort ports.StoragePort, notifierPort ports.No
notifierPort,
ipfsPort,
minGenesisTime,
sync.Mutex{},
"PendingHashesLoader",
}
}
Expand Down Expand Up @@ -63,6 +65,10 @@ func (phl *PendingHashesLoader) LoadPendingHashesCron(ctx context.Context, inter
}

func (phl *PendingHashesLoader) LoadPendingHashes() error {
// Lock the mutex
phl.mu.Lock()
defer phl.mu.Unlock()

// Get operator IDs
operatorIDs, err := phl.storagePort.GetOperatorIds()
if err != nil {
Expand Down