diff --git a/api/clients/mock/node_client.go b/api/clients/mock/node_client.go index 6e13ee61b..89e9f7611 100644 --- a/api/clients/mock/node_client.go +++ b/api/clients/mock/node_client.go @@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch func (c *MockNodeClient) GetChunks( ctx context.Context, opID core.OperatorID, - opInfo *core.IndexedOperatorInfo, + opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, diff --git a/api/clients/node_client.go b/api/clients/node_client.go index f2284620f..f0be917d4 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -21,7 +21,7 @@ type RetrievedChunks struct { type NodeClient interface { GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error) - GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks) + GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks) } type client struct { @@ -79,7 +79,7 @@ func (c client) GetBlobHeader( func (c client) GetChunks( ctx context.Context, opID core.OperatorID, - opInfo *core.IndexedOperatorInfo, + opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, diff --git a/api/clients/retrieval_client.go b/api/clients/retrieval_client.go index e8782a5e6..0121b09fe 100644 --- a/api/clients/retrieval_client.go +++ b/api/clients/retrieval_client.go @@ -53,7 +53,7 @@ type BlobChunks struct { type retrievalClient struct { logger logging.Logger - indexedChainState core.IndexedChainState + chainState core.ChainState assignmentCoordinator core.AssignmentCoordinator nodeClient NodeClient verifier encoding.Verifier @@ -63,7 +63,7 @@ type retrievalClient struct { // NewRetrievalClient creates a new retrieval client. func NewRetrievalClient( logger logging.Logger, - chainState core.IndexedChainState, + chainState core.ChainState, assignmentCoordinator core.AssignmentCoordinator, nodeClient NodeClient, verifier encoding.Verifier, @@ -71,7 +71,7 @@ func NewRetrievalClient( return &retrievalClient{ logger: logger.With("component", "RetrievalClient"), - indexedChainState: chainState, + chainState: chainState, assignmentCoordinator: assignmentCoordinator, nodeClient: nodeClient, verifier: verifier, @@ -104,11 +104,11 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context, batchRoot [32]byte, quorumID core.QuorumID) (*BlobChunks, error) { - indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID}) + operatorState, err := r.chainState.GetOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID}) if err != nil { return nil, err } - operators, ok := indexedOperatorState.Operators[quorumID] + operators, ok := operatorState.Operators[quorumID] if !ok { return nil, fmt.Errorf("no quorum with ID: %d", quorumID) } @@ -118,7 +118,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context, var proof *merkletree.Proof var proofVerified bool for opID := range operators { - opInfo := indexedOperatorState.IndexedOperators[opID] + opInfo := operators[opID] blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex) if err != nil { // try another operator @@ -172,7 +172,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context, return nil, err } - assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader) + assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader) if err != nil { return nil, errors.New("failed to get assignments") } @@ -182,7 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context, pool := workerpool.New(r.numConnections) for opID := range operators { opID := opID - opInfo := indexedOperatorState.IndexedOperators[opID] + opInfo := operators[opID] pool.Submit(func() { r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan) }) diff --git a/api/clients/retrieval_client_test.go b/api/clients/retrieval_client_test.go index 2eb1b72f4..6a23868a7 100644 --- a/api/clients/retrieval_client_test.go +++ b/api/clients/retrieval_client_test.go @@ -99,12 +99,7 @@ func setup(t *testing.T) { indexer = &indexermock.MockIndexer{} indexer.On("Index").Return(nil).Once() - ics, err := coreindexer.NewIndexedChainState(chainState, indexer) - if err != nil { - panic("failed to create a new indexed chain state") - } - - retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2) + retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2) if err != nil { panic("failed to create a new retrieval client") } diff --git a/core/eth/state.go b/core/eth/state.go index fd65d7298..31d36873b 100644 --- a/core/eth/state.go +++ b/core/eth/state.go @@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe return nil, err } - return getOperatorState(operatorsByQuorum, uint32(blockNumber)) + socketMap := make(map[core.OperatorID]string) + socket, err := cs.Tx.GetOperatorSocket(ctx, operator) + if err != nil { + return nil, err + } + socketMap[operator] = socket + + return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap) } @@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu return nil, err } - return getOperatorState(operatorsByQuorum, uint32(blockNumber)) + socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum) + if err != nil { + return nil, err + } + + return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap) } func (cs *ChainState) GetCurrentBlockNumber() (uint, error) { @@ -59,7 +71,26 @@ func (cs *ChainState) GetOperatorSocket(ctx context.Context, blockNumber uint, o return socket, nil } -func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) (*core.OperatorState, error) { +// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum +func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) { + socketMap := make(map[core.OperatorID]string) + for _, quorum := range operatorsByQuorum { + for _, op := range quorum { + // if the socket is already in the map, skip + if _, ok := socketMap[op.OperatorID]; ok { + continue + } + socket, err := cs.Tx.GetOperatorSocket(ctx, op.OperatorID) + if err != nil { + return nil, err + } + socketMap[op.OperatorID] = socket + } + } + return socketMap, nil +} + +func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32, socketMap map[core.OperatorID]string) (*core.OperatorState, error) { operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo) totals := make(map[core.QuorumID]*core.OperatorInfo) @@ -69,8 +100,9 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) for ind, op := range quorum { operators[quorumID][op.OperatorID] = &core.OperatorInfo{ - Stake: op.Stake, - Index: core.OperatorIndex(ind), + Stake: op.Stake, + Index: core.OperatorIndex(ind), + Socket: socketMap[op.OperatorID], } totalStake.Add(totalStake, op.Stake) } @@ -78,6 +110,8 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) totals[quorumID] = &core.OperatorInfo{ Stake: totalStake, Index: core.OperatorIndex(len(quorum)), + // no socket for the total + Socket: "", } } diff --git a/core/state.go b/core/state.go index 2bbf08d90..1fb802068 100644 --- a/core/state.go +++ b/core/state.go @@ -50,6 +50,8 @@ type OperatorInfo struct { Stake StakeAmount // Index is the index of the operator within the quorum Index OperatorIndex + // Socket is the socket address of the operator, in the form "host:port" + Socket string } // OperatorState contains information about the current state of operators which is stored in the blockchain state diff --git a/core/state_test.go b/core/state_test.go index f8f069745..2af139b74 100644 --- a/core/state_test.go +++ b/core/state_test.go @@ -14,33 +14,39 @@ func TestOperatorStateHash(t *testing.T) { Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ 0: { [32]byte{0}: &core.OperatorInfo{ - Stake: big.NewInt(12), - Index: uint(2), + Stake: big.NewInt(12), + Index: uint(2), + Socket: "192.168.1.100:8080", }, [32]byte{1}: &core.OperatorInfo{ - Stake: big.NewInt(23), - Index: uint(3), + Stake: big.NewInt(23), + Index: uint(3), + Socket: "127.0.0.1:3000", }, }, 1: { [32]byte{1}: &core.OperatorInfo{ - Stake: big.NewInt(23), - Index: uint(3), + Stake: big.NewInt(23), + Index: uint(3), + Socket: "127.0.0.1:3000", }, [32]byte{2}: &core.OperatorInfo{ - Stake: big.NewInt(34), - Index: uint(4), + Stake: big.NewInt(34), + Index: uint(4), + Socket: "192.168.1.100:8080", }, }, }, Totals: map[core.QuorumID]*core.OperatorInfo{ 0: { - Stake: big.NewInt(35), - Index: uint(2), + Stake: big.NewInt(35), + Index: uint(2), + Socket: "", }, 1: { - Stake: big.NewInt(57), - Index: uint(2), + Stake: big.NewInt(57), + Index: uint(2), + Socket: "", }, }, BlockNumber: uint(123), @@ -50,8 +56,8 @@ func TestOperatorStateHash(t *testing.T) { assert.NoError(t, err) q0 := hash1[0] q1 := hash1[1] - assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:])) - assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) + assert.Equal(t, "6098562ea2e61a8f68743f9162b0adc0", hex.EncodeToString(q0[:])) + assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:])) s2 := core.OperatorState{ Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ @@ -93,6 +99,6 @@ func TestOperatorStateHash(t *testing.T) { assert.NoError(t, err) q0 = hash2[0] q1 = hash2[1] - assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:])) - assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) + assert.Equal(t, "dc1bbb0b2b5d20238adfd4bd33661423", hex.EncodeToString(q0[:])) + assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:])) } diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 2f8250a1a..35f9ecf7e 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -189,12 +189,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error { return err } - ics, err := coreindexer.NewIndexedChainState(cs, indexer) - if err != nil { - return err - } - - retrievalClient, err = clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, 10) + retrievalClient, err = clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, 10) if err != nil { return err } diff --git a/retriever/cmd/main.go b/retriever/cmd/main.go index fd1704c31..2468b91a0 100644 --- a/retriever/cmd/main.go +++ b/retriever/cmd/main.go @@ -14,14 +14,11 @@ import ( "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" - coreindexer "github.com/Layr-Labs/eigenda/core/indexer" - "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" "github.com/Layr-Labs/eigenda/retriever" retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" "github.com/Layr-Labs/eigenda/retriever/flags" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -98,47 +95,17 @@ func RetrieverMain(ctx *cli.Context) error { log.Fatalln("could not start tcp listener", err) } cs := eth.NewChainState(tx, gethClient) - rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0]) - if err != nil { - log.Fatalln("could not start tcp listener", err) - } - - var ics core.IndexedChainState - if config.UseGraph { - logger.Info("Using graph node") - - logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - } else { - logger.Info("Using built-in indexer") - - indexer, err := coreindexer.CreateNewIndexer( - &config.IndexerConfig, - gethClient, - rpcClient, - config.EigenDAServiceManagerAddr, - logger, - ) - if err != nil { - return err - } - ics, err = coreindexer.NewIndexedChainState(cs, indexer) - if err != nil { - return err - } - } agn := &core.StdAssignmentCoordinator{} - retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections) + retrievalClient, err := clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, config.NumConnections) if err != nil { log.Fatalln("could not start tcp listener", err) } chainClient := retrivereth.NewChainClient(gethClient, logger) - retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient) - if err = retrieverServiceServer.Start(context.Background()); err != nil { - log.Fatalln("failed to start retriever service server", err) - } + retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, cs, chainClient) + // This only start the metrics server; consider unwrapping the function + retrieverServiceServer.Start(context.Background()) // Register reflection service on gRPC server // This makes "grpcurl -plaintext localhost:9000 list" command work diff --git a/retriever/server.go b/retriever/server.go index d49f8dbf7..7d5686cbf 100644 --- a/retriever/server.go +++ b/retriever/server.go @@ -19,7 +19,7 @@ type Server struct { config *Config retrievalClient clients.RetrievalClient chainClient eth.ChainClient - indexedState core.IndexedChainState + chainState core.ChainState logger logging.Logger metrics *Metrics } @@ -28,7 +28,7 @@ func NewServer( config *Config, logger logging.Logger, retrievalClient clients.RetrievalClient, - indexedState core.IndexedChainState, + chainState core.ChainState, chainClient eth.ChainClient, ) *Server { metrics := NewMetrics(config.MetricsConfig.HTTPPort, logger) @@ -37,15 +37,14 @@ func NewServer( config: config, retrievalClient: retrievalClient, chainClient: chainClient, - indexedState: indexedState, + chainState: chainState, logger: logger.With("component", "RetrieverServer"), metrics: metrics, } } -func (s *Server) Start(ctx context.Context) error { +func (s *Server) Start(ctx context.Context) { s.metrics.Start(ctx) - return s.indexedState.Start(ctx) } func (s *Server) RetrieveBlob(ctx context.Context, req *pb.BlobRequest) (*pb.BlobReply, error) { diff --git a/test/synthetic-test/synthetic_client_test.go b/test/synthetic-test/synthetic_client_test.go index ab868c557..4939cb44d 100644 --- a/test/synthetic-test/synthetic_client_test.go +++ b/test/synthetic-test/synthetic_client_test.go @@ -19,7 +19,6 @@ import ( "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/google/uuid" - "github.com/shurcooL/graphql" "github.com/Layr-Labs/eigenda/api/clients" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -29,7 +28,6 @@ import ( rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" - "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" @@ -220,8 +218,6 @@ func setupRetrievalClient(ethClient common.EthClient, retrievalClientConfig *Ret } cs := eth.NewChainState(tx, ethClient) - querier := graphql.NewClient(retrievalClientConfig.ChurnerGraphUrl, nil) - indexedChainStateClient := thegraph.NewIndexedChainState(cs, querier, logger) agn := &core.StdAssignmentCoordinator{} // TODO: What should be the value here? @@ -246,12 +242,12 @@ func setupRetrievalClient(ethClient common.EthClient, retrievalClientConfig *Ret return err } - retrievalClient, err = clients.NewRetrievalClient(logger, indexedChainStateClient, agn, nodeClient, v, 10) + retrievalClient, err = clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, 10) if err != nil { return err } - return indexedChainStateClient.Start(context.Background()) + return nil } // TODO: This file contains some code that can be refactored and shared across some other tests ex:Integration Test. diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 61575b9c0..25ba2f0c9 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -12,7 +12,6 @@ import ( "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/core/eth" - "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" "github.com/Layr-Labs/eigenda/tools/traffic/config" @@ -172,8 +171,6 @@ func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth cs := eth.NewChainState(tx, gethClient) - chainState := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) - var assignmentCoordinator core.AssignmentCoordinator = &core.StdAssignmentCoordinator{} nodeClient := clients.NewNodeClient(config.NodeClientTimeout) @@ -185,7 +182,7 @@ func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth retriever, err := clients.NewRetrievalClient( logger, - chainState, + cs, assignmentCoordinator, nodeClient, v,