Skip to content

Commit

Permalink
Introduced avs manager (#27)
Browse files Browse the repository at this point in the history
* feat: monitor operator set updates, sign and send.

* feat: init AvsManager to offload operator from L1 work.(RAW)

* feat: migrating registration

* fix: integration tests

* refactor: removed unnecessary files. Moved unnecessary entities from AvsManager

* fix: compilation after rebase

* fix: integration tests

* fix: Resolve integration test issue for Macs. --network host doesn't work on Win & Darwin

* fix: conflicts with main branch

* fix: renamings + port leftover

* fix: unit tests

* refactor: style edits + give some time to indexer to init clients

* refactor: revert integration test changes to make it pass + some logging

* refactor: resolving minor pr comments
  • Loading branch information
taco-paco authored Feb 23, 2024
1 parent 282df61 commit 18b4197
Show file tree
Hide file tree
Showing 8 changed files with 511 additions and 396 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ target
# Aggregator DB
aggregator.db

# just for example
id_rsa
# Test artifacts
test_data
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mocks: ## generates mocks for tests
go generate ./...

tests-unit: ## runs all unit tests
go test $$(go list ./... | grep -v /integration) -coverprofile=coverage.out -covermode=atomic --timeout 15s
go test $$(go list ./... | grep -v /integration) -coverprofile=coverage.out -covermode=atomic --timeout 18s
go tool cover -html=coverage.out -o coverage.html

tests-contract: ## runs all forge tests
Expand Down
322 changes: 322 additions & 0 deletions operator/avs_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
package operator

import (
"context"
"crypto/ecdsa"
"fmt"
"math/big"

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/elcontracts"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
eigenSdkTypes "github.com/Layr-Labs/eigensdk-go/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"

opsetupdatereg "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLOperatorSetUpdateRegistry"
registryrollup "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLRegistryRollup"
taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager"
"github.com/NethermindEth/near-sffl/core/chainio"
"github.com/NethermindEth/near-sffl/types"
)

type AvsManagerer interface {
Start(ctx context.Context, operatorAddr common.Address) error
DepositIntoStrategy(operatorAddr common.Address, strategyAddr common.Address, amount *big.Int) error
RegisterOperatorWithEigenlayer(operatorAddr common.Address) error
RegisterOperatorWithAvs(
client eth.EthClient,
operatorEcdsaKeyPair *ecdsa.PrivateKey,
blsKeyPair *bls.KeyPair,
) error
ProcessCheckpointTaskCreatedLog(checkpointTaskCreatedLog *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated) taskmanager.CheckpointTaskResponse

GetOperatorId(options *bind.CallOpts, address common.Address) ([32]byte, error)
GetCheckpointTaskResponseChan() <-chan taskmanager.CheckpointTaskResponse
GetOperatorSetUpdateChan() <-chan registryrollup.OperatorSetUpdateMessage
}

type AvsManager struct {
avsWriter *chainio.AvsWriter
avsReader chainio.AvsReaderer
avsSubscriber chainio.AvsSubscriberer
eigenlayerReader elcontracts.ELReader
eigenlayerWriter elcontracts.ELWriter

// receive new tasks in this chan (typically from listening to onchain event)
checkpointTaskCreatedChan chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated
// receive operator set updates in this chan
operatorSetUpdateChan chan *opsetupdatereg.ContractSFFLOperatorSetUpdateRegistryOperatorSetUpdatedAtBlock

// Sends message for operator to sign
checkpointTaskResponseCreatedChan chan taskmanager.CheckpointTaskResponse
operatorSetUpdateMessageChan chan registryrollup.OperatorSetUpdateMessage

logger sdklogging.Logger
}

func NewAvsManager(config *types.NodeConfig, ethRpcClient eth.EthClient, ethWsClient eth.EthClient, sdkClients *clients.Clients, txManager *txmgr.SimpleTxManager, logger sdklogging.Logger) (*AvsManager, error) {
avsWriter, err := chainio.BuildAvsWriter(
txManager, common.HexToAddress(config.AVSRegistryCoordinatorAddress),
common.HexToAddress(config.OperatorStateRetrieverAddress), ethRpcClient, logger,
)
if err != nil {
logger.Error("Cannot create AvsWriter", "err", err)
return nil, err
}

avsReader, err := chainio.BuildAvsReader(
common.HexToAddress(config.AVSRegistryCoordinatorAddress),
common.HexToAddress(config.OperatorStateRetrieverAddress),
ethRpcClient, logger)
if err != nil {
logger.Error("Cannot create AvsReader", "err", err)
return nil, err
}

avsSubscriber, err := chainio.BuildAvsSubscriber(common.HexToAddress(config.AVSRegistryCoordinatorAddress),
common.HexToAddress(config.OperatorStateRetrieverAddress), ethWsClient, logger,
)
if err != nil {
logger.Error("Cannot create AvsSubscriber", "err", err)
return nil, err
}

return &AvsManager{
avsReader: avsReader,
avsWriter: avsWriter,
avsSubscriber: avsSubscriber,
eigenlayerReader: sdkClients.ElChainReader,
eigenlayerWriter: sdkClients.ElChainWriter,
checkpointTaskCreatedChan: make(chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated),
operatorSetUpdateChan: make(chan *opsetupdatereg.ContractSFFLOperatorSetUpdateRegistryOperatorSetUpdatedAtBlock),
checkpointTaskResponseCreatedChan: make(chan taskmanager.CheckpointTaskResponse),
operatorSetUpdateMessageChan: make(chan registryrollup.OperatorSetUpdateMessage),
logger: logger,
}, nil
}

func (avsManager *AvsManager) GetCheckpointTaskResponseChan() <-chan taskmanager.CheckpointTaskResponse {
return avsManager.checkpointTaskResponseCreatedChan
}

func (avsManager *AvsManager) GetOperatorSetUpdateChan() <-chan registryrollup.OperatorSetUpdateMessage {
return avsManager.operatorSetUpdateMessageChan
}

func (avsManager *AvsManager) Start(ctx context.Context, operatorAddr common.Address) error {
operatorIsRegistered, err := avsManager.IsOperatorRegistered(&bind.CallOpts{}, operatorAddr)
if err != nil {
avsManager.logger.Error("Error checking if operator is registered", "err", err)
return err
}

if !operatorIsRegistered {
// We bubble the error all the way up instead of using logger.Fatal because logger.Fatal prints a huge stack trace
// that hides the actual error message. This error msg is more explicit and doesn't require showing a stack trace to the user.
return fmt.Errorf("operator is not registered. Registering operator using the operator-cli before starting operator")
}

newTasksSub, err := avsManager.avsSubscriber.SubscribeToNewTasks(avsManager.checkpointTaskCreatedChan)
if err != nil {
avsManager.logger.Error("Error subscribing to new tasks", "err", err)
return err
}

operatorSetUpdateSub, err := avsManager.avsSubscriber.SubscribeToOperatorSetUpdates(avsManager.operatorSetUpdateChan)
if err != nil {
avsManager.logger.Error("Error subscribing to operator set updates", "err", err)
return err
}

go func() {
for {
select {
case <-ctx.Done():
return

case err := <-newTasksSub.Err():
avsManager.logger.Error("Error in websocket subscription", "err", err)
// TODO(samlaf): write unit tests to check if this fixed the issues we were seeing
newTasksSub.Unsubscribe()
// TODO(samlaf): wrap this call with increase in avs-node-spec metric
newTasksSub, err = avsManager.avsSubscriber.SubscribeToNewTasks(avsManager.checkpointTaskCreatedChan)
if err != nil {
avsManager.logger.Error("Error re-subscribing to new tasks", "err", err)
close(avsManager.checkpointTaskResponseCreatedChan)
return
}

continue

case checkpointTaskCreatedLog := <-avsManager.checkpointTaskCreatedChan:
taskResponse := avsManager.ProcessCheckpointTaskCreatedLog(checkpointTaskCreatedLog)
avsManager.checkpointTaskResponseCreatedChan <- taskResponse

case err := <-operatorSetUpdateSub.Err():
avsManager.logger.Error("Error in websocket subscription", "err", err)
operatorSetUpdateSub.Unsubscribe()
operatorSetUpdateSub, err = avsManager.avsSubscriber.SubscribeToOperatorSetUpdates(avsManager.operatorSetUpdateChan)
if err != nil {
avsManager.logger.Error("Error re-subscribing to operator set updates", "err", err)
close(avsManager.checkpointTaskResponseCreatedChan)
return
}

continue

case operatorSetUpdate := <-avsManager.operatorSetUpdateChan:
go avsManager.handleOperatorSetUpdate(ctx, operatorSetUpdate)
continue
}
}
}()

return nil
}

// Takes a CheckpointTaskCreatedLog struct as input and returns a TaskResponseHeader struct.
// The TaskResponseHeader struct is the struct that is signed and sent to the contract as a task response.
func (avsManager *AvsManager) ProcessCheckpointTaskCreatedLog(checkpointTaskCreatedLog *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated) taskmanager.CheckpointTaskResponse {
avsManager.logger.Debug("Received new task", "task", checkpointTaskCreatedLog)
avsManager.logger.Info("Received new task",
"fromTimestamp", checkpointTaskCreatedLog.Task.FromTimestamp,
"toTimestamp", checkpointTaskCreatedLog.Task.ToTimestamp,
"taskIndex", checkpointTaskCreatedLog.TaskIndex,
"taskCreatedBlock", checkpointTaskCreatedLog.Task.TaskCreatedBlock,
"quorumNumbers", checkpointTaskCreatedLog.Task.QuorumNumbers,
"quorumThreshold", checkpointTaskCreatedLog.Task.QuorumThreshold,
)

// TODO: build SMT based on stored message agreements and update the test

taskResponse := taskmanager.CheckpointTaskResponse{
ReferenceTaskIndex: checkpointTaskCreatedLog.TaskIndex,
StateRootUpdatesRoot: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
OperatorSetUpdatesRoot: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
}
return taskResponse
}

func (avsManager *AvsManager) handleOperatorSetUpdate(ctx context.Context, data *opsetupdatereg.ContractSFFLOperatorSetUpdateRegistryOperatorSetUpdatedAtBlock) error {
operatorSetDelta, err := avsManager.avsReader.GetOperatorSetUpdateDelta(ctx, data.Id)
if err != nil {
avsManager.logger.Errorf("Couldn't get Operator set update delta: %v for block: %v", err, data.Id)
return err
}

operators := make([]registryrollup.OperatorsOperator, len(operatorSetDelta))
for i := 0; i < len(operatorSetDelta); i++ {
operators[i] = registryrollup.OperatorsOperator{
Pubkey: registryrollup.BN254G1Point{
X: operatorSetDelta[i].Pubkey.X,
Y: operatorSetDelta[i].Pubkey.Y,
},
Weight: operatorSetDelta[i].Weight,
}
}

message := registryrollup.OperatorSetUpdateMessage{
Id: data.Id,
Timestamp: data.Timestamp,
Operators: operators,
}

avsManager.operatorSetUpdateMessageChan <- message
return nil
}

func (avsManager *AvsManager) DepositIntoStrategy(operatorAddr common.Address, strategyAddr common.Address, amount *big.Int) error {
_, tokenAddr, err := avsManager.eigenlayerReader.GetStrategyAndUnderlyingToken(&bind.CallOpts{}, strategyAddr)
if err != nil {
avsManager.logger.Error("Failed to fetch strategy contract", "err", err)
return err
}
contractErc20Mock, err := avsManager.avsReader.GetErc20Mock(context.Background(), tokenAddr)
if err != nil {
avsManager.logger.Error("Failed to fetch ERC20Mock contract", "err", err)
return err
}
txOpts, err := avsManager.avsWriter.TxMgr.GetNoSendTxOpts()
tx, err := contractErc20Mock.Mint(txOpts, operatorAddr, amount)
if err != nil {
avsManager.logger.Errorf("Error assembling Mint tx")
return err
}
_, err = avsManager.avsWriter.TxMgr.Send(context.Background(), tx)
if err != nil {
avsManager.logger.Errorf("Error submitting Mint tx")
return err
}

_, err = avsManager.eigenlayerWriter.DepositERC20IntoStrategy(context.Background(), strategyAddr, amount)
if err != nil {
avsManager.logger.Errorf("Error depositing into strategy", "err", err)
return err
}
return nil
}

func (avsManager *AvsManager) RegisterOperatorWithEigenlayer(operatorAddr common.Address) error {
operator := eigenSdkTypes.Operator{
Address: operatorAddr.String(),
EarningsReceiverAddress: operatorAddr.String(),
}
_, err := avsManager.eigenlayerWriter.RegisterAsOperator(context.Background(), operator)
if err != nil {
avsManager.logger.Errorf("Error registering operator with eigenlayer")
return err
}

return nil
}

// RegisterOperatorWithAvs Registration specific functions
func (avsManager *AvsManager) RegisterOperatorWithAvs(
client eth.EthClient,
operatorEcdsaKeyPair *ecdsa.PrivateKey,
blsKeyPair *bls.KeyPair,
) error {
// hardcode these things for now
quorumNumbers := []byte{0}
socket := "Not Needed"
operatorToAvsRegistrationSigSalt := [32]byte{123}
curBlockNum, err := client.BlockNumber(context.Background())
if err != nil {
avsManager.logger.Errorf("Unable to get current block number")
return err
}

curBlock, err := client.BlockByNumber(context.Background(), big.NewInt(int64(curBlockNum)))
if err != nil {
avsManager.logger.Errorf("Unable to get current block")
return err
}

sigValidForSeconds := int64(1_000_000)
operatorToAvsRegistrationSigExpiry := big.NewInt(int64(curBlock.Time()) + sigValidForSeconds)
_, err = avsManager.avsWriter.RegisterOperatorInQuorumWithAVSRegistryCoordinator(
context.Background(),
operatorEcdsaKeyPair, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry,
blsKeyPair, quorumNumbers, socket,
)

if err != nil {
avsManager.logger.Errorf("Unable to register operator with avs registry coordinator")
return err
}
avsManager.logger.Infof("Registered operator with avs registry coordinator.")

return nil
}

func (avsManager *AvsManager) IsOperatorRegistered(options *bind.CallOpts, address common.Address) (bool, error) {
return avsManager.avsReader.IsOperatorRegistered(options, address)
}

func (avsManager *AvsManager) GetOperatorId(options *bind.CallOpts, address common.Address) ([32]byte, error) {
return avsManager.avsReader.GetOperatorId(options, address)
}
Loading

0 comments on commit 18b4197

Please sign in to comment.