diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go new file mode 100644 index 000000000..bd66e0fea --- /dev/null +++ b/disperser/dataapi/operator_handler.go @@ -0,0 +1,156 @@ +package dataapi + +import ( + "context" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/common/semver" + "github.com/Layr-Labs/eigenda/operators" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +// operatorHandler handles operations to collect and process operators info. +type operatorHandler struct { + // For visibility + logger logging.Logger + metrics *Metrics + + // For accessing operator info + chainReader core.Reader + chainState core.ChainState + indexedChainState core.IndexedChainState + subgraphClient SubgraphClient +} + +func newOperatorHandler(logger logging.Logger, metrics *Metrics, chainReader core.Reader, chainState core.ChainState, indexedChainState core.IndexedChainState, subgraphClient SubgraphClient) *operatorHandler { + return &operatorHandler{ + logger: logger, + metrics: metrics, + chainReader: chainReader, + chainState: chainState, + indexedChainState: indexedChainState, + subgraphClient: subgraphClient, + } +} + +func (oh *operatorHandler) probeOperatorHosts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { + operatorInfo, err := oh.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId) + if err != nil { + oh.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err) + return &OperatorPortCheckResponse{}, err + } + + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + retrievalSocket := operatorSocket.GetRetrievalSocket() + retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, oh.logger) + + dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, oh.logger) + + // Create the metadata regardless of online status + portCheckResponse := &OperatorPortCheckResponse{ + OperatorId: operatorId, + DispersalSocket: dispersalSocket, + RetrievalSocket: retrievalSocket, + DispersalOnline: dispersalOnline, + RetrievalOnline: retrievalOnline, + } + + // Log the online status + oh.logger.Info("operator port check response", "response", portCheckResponse) + + // Send the metadata to the results channel + return portCheckResponse, nil +} + +func (oh *operatorHandler) getOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) { + currentBlock, err := oh.indexedChainState.GetCurrentBlockNumber() + if err != nil { + return nil, fmt.Errorf("failed to fetch current block number: %w", err) + } + state, err := oh.chainState.GetOperatorState(ctx, currentBlock, []core.QuorumID{0, 1, 2}) + if err != nil { + return nil, fmt.Errorf("failed to fetch indexed operator state: %w", err) + } + + tqs, quorumsStake := operators.GetRankedOperators(state) + oh.metrics.UpdateOperatorsStake(tqs, quorumsStake) + + stakeRanked := make(map[string][]*OperatorStake) + for q, operators := range quorumsStake { + quorum := fmt.Sprintf("%d", q) + stakeRanked[quorum] = make([]*OperatorStake, 0) + for i, op := range operators { + if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() { + stakeRanked[quorum] = append(stakeRanked[quorum], &OperatorStake{ + QuorumId: quorum, + OperatorId: op.OperatorId.Hex(), + StakePercentage: op.StakeShare / 100.0, + Rank: i + 1, + }) + } + } + } + stakeRanked["total"] = make([]*OperatorStake, 0) + for i, op := range tqs { + if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() { + stakeRanked["total"] = append(stakeRanked["total"], &OperatorStake{ + QuorumId: "total", + OperatorId: op.OperatorId.Hex(), + StakePercentage: op.StakeShare / 100.0, + Rank: i + 1, + }) + } + } + return &OperatorsStakeResponse{ + StakeRankedOperators: stakeRanked, + }, nil +} + +func (s *operatorHandler) scanOperatorsHostInfo(ctx context.Context) (*SemverReportResponse, error) { + currentBlock, err := s.indexedChainState.GetCurrentBlockNumber() + if err != nil { + return nil, fmt.Errorf("failed to fetch current block number - %s", err) + } + operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock) + if err != nil { + return nil, fmt.Errorf("failed to fetch indexed operator info - %s", err) + } + + // check operator socket registration against the indexed state + for operatorID, operatorInfo := range operators { + socket, err := s.chainState.GetOperatorSocket(context.Background(), currentBlock, operatorID) + if err != nil { + s.logger.Warn("failed to get operator socket", "operatorId", operatorID.Hex(), "error", err) + continue + } + if socket != operatorInfo.Socket { + s.logger.Warn("operator socket mismatch", "operatorId", operatorID.Hex(), "socket", socket, "operatorInfo", operatorInfo.Socket) + } + } + + s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock) + operatorState, err := s.chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2}) + if err != nil { + return nil, fmt.Errorf("failed to fetch operator state - %s", err) + } + + nodeInfoWorkers := 20 + nodeInfoTimeout := time.Duration(1 * time.Second) + useRetrievalClient := false + semvers := semver.ScanOperators(operators, operatorState, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger) + + // Create HostInfoReportResponse instance + semverReport := &SemverReportResponse{ + Semver: semvers, + } + + // Publish semver report metrics + s.metrics.UpdateSemverCounts(semvers) + + s.logger.Info("Semver scan completed", "semverReport", semverReport) + return semverReport, nil + +} diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index b9b5bdf7e..aeb3ef6ba 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -2,7 +2,6 @@ package dataapi import ( "context" - "fmt" "math/big" "net" "sort" @@ -10,7 +9,6 @@ import ( "time" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/disperser/common/semver" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/gammazero/workerpool" ) @@ -247,90 +245,6 @@ func ValidOperatorIP(address string, logger logging.Logger) bool { return isValid } -func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { - operatorInfo, err := s.getOperatorInfo(ctx, operatorId) - if err != nil { - s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err) - return &OperatorPortCheckResponse{}, err - } - - operatorSocket := core.OperatorSocket(operatorInfo.Socket) - retrievalSocket := operatorSocket.GetRetrievalSocket() - retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger) - - dispersalSocket := operatorSocket.GetDispersalSocket() - dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) - - // Create the metadata regardless of online status - portCheckResponse := &OperatorPortCheckResponse{ - OperatorId: operatorId, - DispersalSocket: dispersalSocket, - RetrievalSocket: retrievalSocket, - DispersalOnline: dispersalOnline, - RetrievalOnline: retrievalOnline, - } - - // Log the online status - s.logger.Info("operator port check response", "response", portCheckResponse) - - // Send the metadata to the results channel - return portCheckResponse, nil -} - -func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) { - operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId) - if err != nil { - s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err) - return nil, fmt.Errorf("operator info not found for operatorId %s", operatorId) - } - return operatorInfo, nil -} - -func (s *server) scanOperatorsHostInfo(ctx context.Context) (*SemverReportResponse, error) { - currentBlock, err := s.indexedChainState.GetCurrentBlockNumber() - if err != nil { - return nil, fmt.Errorf("failed to fetch current block number - %s", err) - } - operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock) - if err != nil { - return nil, fmt.Errorf("failed to fetch indexed operator info - %s", err) - } - - // check operator socket registration against the indexed state - for operatorID, operatorInfo := range operators { - socket, err := s.chainState.GetOperatorSocket(context.Background(), currentBlock, operatorID) - if err != nil { - s.logger.Warn("failed to get operator socket", "operatorId", operatorID.Hex(), "error", err) - continue - } - if socket != operatorInfo.Socket { - s.logger.Warn("operator socket mismatch", "operatorId", operatorID.Hex(), "socket", socket, "operatorInfo", operatorInfo.Socket) - } - } - - s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock) - operatorState, err := s.chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2}) - if err != nil { - return nil, fmt.Errorf("failed to fetch operator state - %s", err) - } - - nodeInfoWorkers := 20 - nodeInfoTimeout := time.Duration(1 * time.Second) - useRetrievalClient := false - semvers := semver.ScanOperators(operators, operatorState, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger) - - // Create HostInfoReportResponse instance - semverReport := &SemverReportResponse{ - Semver: semvers, - } - - // Publish semver report metrics - s.metrics.UpdateSemverCounts(semvers) - - s.logger.Info("Semver scan completed", "semverReport", semverReport) - return semverReport, nil -} - // method to check if operator is online via socket dial func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { if !ValidOperatorIP(socket, logger) { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index abed52e39..722f05a9c 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -18,7 +18,6 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/operators" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc/health/grpc_health_v1" @@ -208,6 +207,8 @@ type ( batcherHealthEndpt string eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker eigenDAHttpServiceChecker EigenDAHttpServiceChecker + + operatorHandler *operatorHandler } ) @@ -239,8 +240,10 @@ func NewServer( eigenDAHttpServiceChecker = &HttpServiceAvailability{} } + l := logger.With("component", "DataAPIServer") + return &server{ - logger: logger.With("component", "DataAPIServer"), + logger: l, serverMode: config.ServerMode, socketAddr: config.SocketAddr, allowOrigins: config.AllowOrigins, @@ -256,6 +259,7 @@ func NewServer( batcherHealthEndpt: config.BatcherHealthEndpt, eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker, eigenDAHttpServiceChecker: eigenDAHttpServiceChecker, + operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient), } } @@ -726,52 +730,13 @@ func (s *server) OperatorsStake(c *gin.Context) { operatorId := c.DefaultQuery("operator_id", "") s.logger.Info("getting operators stake distribution", "operatorId", operatorId) - currentBlock, err := s.indexedChainState.GetCurrentBlockNumber() - if err != nil { - s.metrics.IncrementFailedRequestNum("OperatorsStake") - errorResponse(c, fmt.Errorf("failed to fetch current block number - %s", err)) - return - } - state, err := s.chainState.GetOperatorState(c, currentBlock, []core.QuorumID{0, 1, 2}) + operatorsStakeResponse, err := s.operatorHandler.getOperatorsStake(c.Request.Context(), operatorId) if err != nil { s.metrics.IncrementFailedRequestNum("OperatorsStake") - errorResponse(c, fmt.Errorf("failed to fetch indexed operator state - %s", err)) + errorResponse(c, fmt.Errorf("failed to get operator stake - %s", err)) return } - tqs, quorumsStake := operators.GetRankedOperators(state) - s.metrics.UpdateOperatorsStake(tqs, quorumsStake) - - stakeRanked := make(map[string][]*OperatorStake) - for q, operators := range quorumsStake { - quorum := fmt.Sprintf("%d", q) - stakeRanked[quorum] = make([]*OperatorStake, 0) - for i, op := range operators { - if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() { - stakeRanked[quorum] = append(stakeRanked[quorum], &OperatorStake{ - QuorumId: quorum, - OperatorId: op.OperatorId.Hex(), - StakePercentage: op.StakeShare / 100.0, - Rank: i + 1, - }) - } - } - } - stakeRanked["total"] = make([]*OperatorStake, 0) - for i, op := range tqs { - if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() { - stakeRanked["total"] = append(stakeRanked["total"], &OperatorStake{ - QuorumId: "total", - OperatorId: op.OperatorId.Hex(), - StakePercentage: op.StakeShare / 100.0, - Rank: i + 1, - }) - } - } - operatorsStakeResponse := &OperatorsStakeResponse{ - StakeRankedOperators: stakeRanked, - } - s.metrics.IncrementSuccessfulRequestNum("OperatorsStake") c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorsStakeAge)) c.JSON(http.StatusOK, operatorsStakeResponse) @@ -956,7 +921,7 @@ func (s *server) OperatorPortCheck(c *gin.Context) { operatorId := c.DefaultQuery("operator_id", "") s.logger.Info("checking operator ports", "operatorId", operatorId) - portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) + portCheckResponse, err := s.operatorHandler.probeOperatorHosts(c.Request.Context(), operatorId) if err != nil { if strings.Contains(err.Error(), "not found") { err = errNotFound @@ -988,7 +953,7 @@ func (s *server) SemverScan(c *gin.Context) { })) defer timer.ObserveDuration() - report, err := s.scanOperatorsHostInfo(c.Request.Context()) + report, err := s.operatorHandler.scanOperatorsHostInfo(c.Request.Context()) if err != nil { s.logger.Error("failed to scan operators host info", "error", err) s.metrics.IncrementFailedRequestNum("SemverScan")