Skip to content

Commit

Permalink
Pull out the operator handling logic that's shareable (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Dec 11, 2024
1 parent eeae72a commit e434a5b
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 131 deletions.
156 changes: 156 additions & 0 deletions disperser/dataapi/operator_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package dataapi

import (
"context"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
"github.com/Layr-Labs/eigenda/operators"
"github.com/Layr-Labs/eigensdk-go/logging"
)

// operatorHandler handles operations to collect and process operators info.
type operatorHandler struct {
// For visibility
logger logging.Logger
metrics *Metrics

// For accessing operator info
chainReader core.Reader
chainState core.ChainState
indexedChainState core.IndexedChainState
subgraphClient SubgraphClient
}

func newOperatorHandler(logger logging.Logger, metrics *Metrics, chainReader core.Reader, chainState core.ChainState, indexedChainState core.IndexedChainState, subgraphClient SubgraphClient) *operatorHandler {
return &operatorHandler{
logger: logger,
metrics: metrics,
chainReader: chainReader,
chainState: chainState,
indexedChainState: indexedChainState,
subgraphClient: subgraphClient,
}
}

func (oh *operatorHandler) probeOperatorHosts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := oh.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId)
if err != nil {
oh.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return &OperatorPortCheckResponse{}, err
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, oh.logger)

dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, oh.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
oh.logger.Info("operator port check response", "response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

func (oh *operatorHandler) getOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) {
currentBlock, err := oh.indexedChainState.GetCurrentBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number: %w", err)
}
state, err := oh.chainState.GetOperatorState(ctx, currentBlock, []core.QuorumID{0, 1, 2})
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator state: %w", err)
}

tqs, quorumsStake := operators.GetRankedOperators(state)
oh.metrics.UpdateOperatorsStake(tqs, quorumsStake)

stakeRanked := make(map[string][]*OperatorStake)
for q, operators := range quorumsStake {
quorum := fmt.Sprintf("%d", q)
stakeRanked[quorum] = make([]*OperatorStake, 0)
for i, op := range operators {
if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() {
stakeRanked[quorum] = append(stakeRanked[quorum], &OperatorStake{
QuorumId: quorum,
OperatorId: op.OperatorId.Hex(),
StakePercentage: op.StakeShare / 100.0,
Rank: i + 1,
})
}
}
}
stakeRanked["total"] = make([]*OperatorStake, 0)
for i, op := range tqs {
if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() {
stakeRanked["total"] = append(stakeRanked["total"], &OperatorStake{
QuorumId: "total",
OperatorId: op.OperatorId.Hex(),
StakePercentage: op.StakeShare / 100.0,
Rank: i + 1,
})
}
}
return &OperatorsStakeResponse{
StakeRankedOperators: stakeRanked,
}, nil
}

func (s *operatorHandler) scanOperatorsHostInfo(ctx context.Context) (*SemverReportResponse, error) {
currentBlock, err := s.indexedChainState.GetCurrentBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number: %w", err)
}
operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator info: %w", err)
}

// check operator socket registration against the indexed state
for operatorID, operatorInfo := range operators {
socket, err := s.chainState.GetOperatorSocket(context.Background(), currentBlock, operatorID)
if err != nil {
s.logger.Warn("failed to get operator socket", "operatorId", operatorID.Hex(), "error", err)
continue
}
if socket != operatorInfo.Socket {
s.logger.Warn("operator socket mismatch", "operatorId", operatorID.Hex(), "socket", socket, "operatorInfo", operatorInfo.Socket)
}
}

s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock)
operatorState, err := s.chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
if err != nil {
return nil, fmt.Errorf("failed to fetch operator state: %w", err)
}

nodeInfoWorkers := 20
nodeInfoTimeout := time.Duration(1 * time.Second)
useRetrievalClient := false
semvers := semver.ScanOperators(operators, operatorState, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger)

// Create HostInfoReportResponse instance
semverReport := &SemverReportResponse{
Semver: semvers,
}

// Publish semver report metrics
s.metrics.UpdateSemverCounts(semvers)

s.logger.Info("Semver scan completed", "semverReport", semverReport)
return semverReport, nil

}
86 changes: 0 additions & 86 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package dataapi

import (
"context"
"fmt"
"math/big"
"net"
"sort"
"strings"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gammazero/workerpool"
)
Expand Down Expand Up @@ -247,90 +245,6 @@ func ValidOperatorIP(address string, logger logging.Logger) bool {
return isValid
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.getOperatorInfo(ctx, operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return &OperatorPortCheckResponse{}, err
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger)

dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
s.logger.Info("operator port check response", "response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return nil, fmt.Errorf("operator info not found for operatorId %s", operatorId)
}
return operatorInfo, nil
}

func (s *server) scanOperatorsHostInfo(ctx context.Context) (*SemverReportResponse, error) {
currentBlock, err := s.indexedChainState.GetCurrentBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number - %s", err)
}
operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator info - %s", err)
}

// check operator socket registration against the indexed state
for operatorID, operatorInfo := range operators {
socket, err := s.chainState.GetOperatorSocket(context.Background(), currentBlock, operatorID)
if err != nil {
s.logger.Warn("failed to get operator socket", "operatorId", operatorID.Hex(), "error", err)
continue
}
if socket != operatorInfo.Socket {
s.logger.Warn("operator socket mismatch", "operatorId", operatorID.Hex(), "socket", socket, "operatorInfo", operatorInfo.Socket)
}
}

s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock)
operatorState, err := s.chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
if err != nil {
return nil, fmt.Errorf("failed to fetch operator state - %s", err)
}

nodeInfoWorkers := 20
nodeInfoTimeout := time.Duration(1 * time.Second)
useRetrievalClient := false
semvers := semver.ScanOperators(operators, operatorState, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger)

// Create HostInfoReportResponse instance
semverReport := &SemverReportResponse{
Semver: semvers,
}

// Publish semver report metrics
s.metrics.UpdateSemverCounts(semvers)

s.logger.Info("Semver scan completed", "semverReport", semverReport)
return semverReport, nil
}

// method to check if operator is online via socket dial
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
if !ValidOperatorIP(socket, logger) {
Expand Down
55 changes: 10 additions & 45 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/operators"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/health/grpc_health_v1"

Expand Down Expand Up @@ -208,6 +207,8 @@ type (
batcherHealthEndpt string
eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker
eigenDAHttpServiceChecker EigenDAHttpServiceChecker

operatorHandler *operatorHandler
}
)

Expand Down Expand Up @@ -239,8 +240,10 @@ func NewServer(
eigenDAHttpServiceChecker = &HttpServiceAvailability{}
}

l := logger.With("component", "DataAPIServer")

return &server{
logger: logger.With("component", "DataAPIServer"),
logger: l,
serverMode: config.ServerMode,
socketAddr: config.SocketAddr,
allowOrigins: config.AllowOrigins,
Expand All @@ -256,6 +259,7 @@ func NewServer(
batcherHealthEndpt: config.BatcherHealthEndpt,
eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker,
eigenDAHttpServiceChecker: eigenDAHttpServiceChecker,
operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient),
}
}

Expand Down Expand Up @@ -726,52 +730,13 @@ func (s *server) OperatorsStake(c *gin.Context) {
operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("getting operators stake distribution", "operatorId", operatorId)

currentBlock, err := s.indexedChainState.GetCurrentBlockNumber()
if err != nil {
s.metrics.IncrementFailedRequestNum("OperatorsStake")
errorResponse(c, fmt.Errorf("failed to fetch current block number - %s", err))
return
}
state, err := s.chainState.GetOperatorState(c, currentBlock, []core.QuorumID{0, 1, 2})
operatorsStakeResponse, err := s.operatorHandler.getOperatorsStake(c.Request.Context(), operatorId)
if err != nil {
s.metrics.IncrementFailedRequestNum("OperatorsStake")
errorResponse(c, fmt.Errorf("failed to fetch indexed operator state - %s", err))
errorResponse(c, fmt.Errorf("failed to get operator stake: %w", err))
return
}

tqs, quorumsStake := operators.GetRankedOperators(state)
s.metrics.UpdateOperatorsStake(tqs, quorumsStake)

stakeRanked := make(map[string][]*OperatorStake)
for q, operators := range quorumsStake {
quorum := fmt.Sprintf("%d", q)
stakeRanked[quorum] = make([]*OperatorStake, 0)
for i, op := range operators {
if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() {
stakeRanked[quorum] = append(stakeRanked[quorum], &OperatorStake{
QuorumId: quorum,
OperatorId: op.OperatorId.Hex(),
StakePercentage: op.StakeShare / 100.0,
Rank: i + 1,
})
}
}
}
stakeRanked["total"] = make([]*OperatorStake, 0)
for i, op := range tqs {
if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() {
stakeRanked["total"] = append(stakeRanked["total"], &OperatorStake{
QuorumId: "total",
OperatorId: op.OperatorId.Hex(),
StakePercentage: op.StakeShare / 100.0,
Rank: i + 1,
})
}
}
operatorsStakeResponse := &OperatorsStakeResponse{
StakeRankedOperators: stakeRanked,
}

s.metrics.IncrementSuccessfulRequestNum("OperatorsStake")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorsStakeAge))
c.JSON(http.StatusOK, operatorsStakeResponse)
Expand Down Expand Up @@ -956,7 +921,7 @@ func (s *server) OperatorPortCheck(c *gin.Context) {

operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("checking operator ports", "operatorId", operatorId)
portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId)
portCheckResponse, err := s.operatorHandler.probeOperatorHosts(c.Request.Context(), operatorId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
err = errNotFound
Expand Down Expand Up @@ -988,7 +953,7 @@ func (s *server) SemverScan(c *gin.Context) {
}))
defer timer.ObserveDuration()

report, err := s.scanOperatorsHostInfo(c.Request.Context())
report, err := s.operatorHandler.scanOperatorsHostInfo(c.Request.Context())
if err != nil {
s.logger.Error("failed to scan operators host info", "error", err)
s.metrics.IncrementFailedRequestNum("SemverScan")
Expand Down

0 comments on commit e434a5b

Please sign in to comment.