Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean config #33

Merged
merged 8 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys"
"github.com/Layr-Labs/eigensdk-go/signerv2"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/NethermindEth/near-sffl/aggregator/types"
Expand Down Expand Up @@ -89,49 +92,67 @@ type Aggregator struct {
// NewAggregator creates a new Aggregator with the provided config.
// TODO: Remove this context once OperatorPubkeysServiceInMemory's API is
// changed and we can gracefully exit otherwise
func NewAggregator(ctx context.Context, c *config.Config) (*Aggregator, error) {
avsReader, err := chainio.BuildAvsReaderFromConfig(c)
func NewAggregator(ctx context.Context, config *config.Config, logger logging.Logger) (*Aggregator, error) {
ethHttpClient, err := eth.NewClient(config.EthHttpRpcUrl)
if err != nil {
c.Logger.Error("Cannot create avsReader", "err", err)
logger.Errorf("Cannot create http ethclient", "err", err)
return nil, err
}

avsWriter, err := chainio.BuildAvsWriterFromConfig(c)
avsReader, err := chainio.BuildAvsReader(config.SFFLRegistryCoordinatorAddr, config.OperatorStateRetrieverAddr, ethHttpClient, logger)
if err != nil {
c.Logger.Errorf("Cannot create avsWriter", "err", err)
logger.Error("Cannot create avsReader", "err", err)
return nil, err
}

chainId, err := ethHttpClient.ChainID(ctx)
if err != nil {
logger.Error("Cannot get chainId", "err", err)
return nil, err
}

signerV2, _, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: config.EcdsaPrivateKey}, chainId)
if err != nil {
panic(err)
}
txMgr := txmgr.NewSimpleTxManager(ethHttpClient, logger, signerV2, config.AggregatorAddress)

avsWriter, err := chainio.BuildAvsWriter(txMgr, config.SFFLRegistryCoordinatorAddr, config.OperatorStateRetrieverAddr, ethHttpClient, logger)
if err != nil {
logger.Errorf("Cannot create avsWriter", "err", err)
return nil, err
}

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: c.EthHttpRpcUrl,
EthWsUrl: c.EthWsRpcUrl,
RegistryCoordinatorAddr: c.SFFLRegistryCoordinatorAddr.String(),
OperatorStateRetrieverAddr: c.OperatorStateRetrieverAddr.String(),
EthHttpUrl: config.EthHttpRpcUrl,
EthWsUrl: config.EthWsRpcUrl,
RegistryCoordinatorAddr: config.SFFLRegistryCoordinatorAddr.String(),
OperatorStateRetrieverAddr: config.OperatorStateRetrieverAddr.String(),
AvsName: avsName,
PromMetricsIpPortAddress: ":9090",
}
clients, err := clients.BuildAll(chainioConfig, c.AggregatorAddress, c.SignerFn, c.Logger)
clients, err := clients.BuildAll(chainioConfig, config.AggregatorAddress, signerV2, logger)
if err != nil {
c.Logger.Errorf("Cannot create sdk clients", "err", err)
logger.Errorf("Cannot create sdk clients", "err", err)
return nil, err
}

msgDb, err := NewMessageDatabase(c.AggregatorDatabasePath)
msgDb, err := NewMessageDatabase(config.AggregatorDatabasePath)
if err != nil {
c.Logger.Errorf("Cannot create database", "err", err)
logger.Errorf("Cannot create database", "err", err)
return nil, err
}

operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, c.Logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, c.Logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, c.Logger)
stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, c.Logger)
operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, c.Logger)
operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)
stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger)
operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger)

return &Aggregator{
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
restServerIpPortAddr: c.AggregatorRestServerIpPortAddr,
logger: logger,
serverIpPortAddr: config.AggregatorServerIpPortAddr,
restServerIpPortAddr: config.AggregatorRestServerIpPortAddr,
avsWriter: avsWriter,
avsReader: avsReader,
taskBlsAggregationService: taskBlsAggregationService,
Expand Down Expand Up @@ -295,6 +316,7 @@ func (agg *Aggregator) handleStateRootUpdateReachedQuorum(blsAggServiceResp type
agg.logger.Error("Aggregator could not store message aggregation")
return
}

}

func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(blsAggServiceResp types.MessageBlsAggregationServiceResponse) {
Expand Down
26 changes: 17 additions & 9 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"

sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/urfave/cli"

"github.com/NethermindEth/near-sffl/aggregator"
Expand All @@ -21,7 +22,6 @@ var (
)

func main() {

app := cli.NewApp()
app.Flags = config.Flags
app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)
Expand All @@ -37,21 +37,30 @@ func main() {
}

func aggregatorMain(ctx *cli.Context) error {

log.Println("Initializing Aggregator")
config, err := config.NewConfig(ctx)

configRaw := config.NewConfigRaw(ctx)
logger, err := sdklogging.NewZapLogger(configRaw.Environment)
if err != nil {
return err
}
configJson, err := json.MarshalIndent(config, "", " ")

config, err := config.NewConfig(configRaw, ctx)
if err != nil {
config.Logger.Fatalf(err.Error())
logger.Fatal("Error creating config", "err", err)
return err
}
fmt.Println("Config:", string(configJson))

bgCtx := context.Background()
{
configJson, err := json.MarshalIndent(config, "", " ")
if err != nil {
logger.Fatalf(err.Error())
}
fmt.Println("Config:", string(configJson))
}
Hyodar marked this conversation as resolved.
Show resolved Hide resolved

agg, err := aggregator.NewAggregator(bgCtx, config)
bgCtx := context.Background()
agg, err := aggregator.NewAggregator(bgCtx, config, logger)
if err != nil {
return err
}
Expand All @@ -62,5 +71,4 @@ func aggregatorMain(ctx *cli.Context) error {
}

return nil

}
1 change: 0 additions & 1 deletion aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
)

func (agg *Aggregator) startServer() error {

err := rpc.Register(agg)
if err != nil {
agg.logger.Fatal("Format of service TaskManager isn't correct. ", "err", err)
Expand Down
4 changes: 0 additions & 4 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
erc20mock "github.com/NethermindEth/near-sffl/contracts/bindings/ERC20Mock"
opsetupdatereg "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLOperatorSetUpdateRegistry"
taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager"
"github.com/NethermindEth/near-sffl/core/config"
)

type AvsReaderer interface {
Expand All @@ -37,9 +36,6 @@ type AvsReader struct {

var _ AvsReaderer = (*AvsReader)(nil)

func BuildAvsReaderFromConfig(c *config.Config) (*AvsReader, error) {
return BuildAvsReader(c.SFFLRegistryCoordinatorAddr, c.OperatorStateRetrieverAddr, c.EthHttpClient, c.Logger)
}
func BuildAvsReader(registryCoordinatorAddr, operatorStateRetrieverAddr gethcommon.Address, ethHttpClient eth.EthClient, logger logging.Logger) (*AvsReader, error) {
avsManagersBindings, err := NewAvsManagersBindings(registryCoordinatorAddr, operatorStateRetrieverAddr, ethHttpClient, logger)
if err != nil {
Expand Down
10 changes: 0 additions & 10 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

opsetupdatereg "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLOperatorSetUpdateRegistry"
taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager"
"github.com/NethermindEth/near-sffl/core/config"
)

type AvsSubscriberer interface {
Expand All @@ -30,15 +29,6 @@ type AvsSubscriber struct {
logger sdklogging.Logger
}

func BuildAvsSubscriberFromConfig(config *config.Config) (*AvsSubscriber, error) {
return BuildAvsSubscriber(
config.SFFLRegistryCoordinatorAddr,
config.OperatorStateRetrieverAddr,
config.EthWsClient,
config.Logger,
)
}

func BuildAvsSubscriber(registryCoordinatorAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethclient eth.EthClient, logger sdklogging.Logger) (*AvsSubscriber, error) {
avsContractBindings, err := NewAvsManagersBindings(registryCoordinatorAddr, blsOperatorStateRetrieverAddr, ethclient, logger)
if err != nil {
Expand Down
51 changes: 23 additions & 28 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
logging "github.com/Layr-Labs/eigensdk-go/logging"

taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager"
"github.com/NethermindEth/near-sffl/core/config"
)

type AvsWriterer interface {
Expand Down Expand Up @@ -49,10 +48,6 @@ type AvsWriter struct {

var _ AvsWriterer = (*AvsWriter)(nil)

func BuildAvsWriterFromConfig(c *config.Config) (*AvsWriter, error) {
return BuildAvsWriter(c.TxMgr, c.SFFLRegistryCoordinatorAddr, c.OperatorStateRetrieverAddr, c.EthHttpClient, c.Logger)
}

Hyodar marked this conversation as resolved.
Show resolved Hide resolved
func BuildAvsWriter(txMgr txmgr.TxManager, registryCoordinatorAddr, operatorStateRetrieverAddr gethcommon.Address, ethHttpClient eth.EthClient, logger logging.Logger) (*AvsWriter, error) {
avsServiceBindings, err := NewAvsManagersBindings(registryCoordinatorAddr, operatorStateRetrieverAddr, ethHttpClient, logger)
if err != nil {
Expand All @@ -76,73 +71,73 @@ func NewAvsWriter(avsRegistryWriter avsregistry.AvsRegistryWriter, avsServiceBin
}

// returns the tx receipt, as well as the task index (which it gets from parsing the tx receipt logs)
func (w *AvsWriter) SendNewCheckpointTask(ctx context.Context, fromTimestamp uint64, toTimestamp uint64, quorumThreshold uint32, quorumNumbers []byte) (taskmanager.CheckpointTask, uint32, error) {
txOpts, err := w.TxMgr.GetNoSendTxOpts()
func (avsWriter *AvsWriter) SendNewCheckpointTask(ctx context.Context, fromTimestamp uint64, toTimestamp uint64, quorumThreshold uint32, quorumNumbers []byte) (taskmanager.CheckpointTask, uint32, error) {
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
txOpts, err := avsWriter.TxMgr.GetNoSendTxOpts()
if err != nil {
w.logger.Errorf("Error getting tx opts")
avsWriter.logger.Errorf("Error getting tx opts")
return taskmanager.CheckpointTask{}, 0, err
}
tx, err := w.AvsContractBindings.TaskManager.CreateCheckpointTask(txOpts, fromTimestamp, toTimestamp, quorumThreshold, quorumNumbers)
tx, err := avsWriter.AvsContractBindings.TaskManager.CreateCheckpointTask(txOpts, fromTimestamp, toTimestamp, quorumThreshold, quorumNumbers)
if err != nil {
w.logger.Errorf("Error assembling CreateCheckpointTask tx")
avsWriter.logger.Errorf("Error assembling CreateCheckpointTask tx")
return taskmanager.CheckpointTask{}, 0, err
}
receipt, err := w.TxMgr.Send(ctx, tx)
receipt, err := avsWriter.TxMgr.Send(ctx, tx)
if err != nil {
w.logger.Errorf("Error submitting CreateCheckpointTask tx")
avsWriter.logger.Errorf("Error submitting CreateCheckpointTask tx")
return taskmanager.CheckpointTask{}, 0, err
}
checkpointTaskCreatedEvent, err := w.AvsContractBindings.TaskManager.ContractSFFLTaskManagerFilterer.ParseCheckpointTaskCreated(*receipt.Logs[0])
checkpointTaskCreatedEvent, err := avsWriter.AvsContractBindings.TaskManager.ContractSFFLTaskManagerFilterer.ParseCheckpointTaskCreated(*receipt.Logs[0])
if err != nil {
w.logger.Error("Aggregator failed to parse new task created event", "err", err)
avsWriter.logger.Error("Aggregator failed to parse new task created event", "err", err)
return taskmanager.CheckpointTask{}, 0, err
}
return checkpointTaskCreatedEvent.Task, checkpointTaskCreatedEvent.TaskIndex, nil
}

func (w *AvsWriter) SendAggregatedResponse(
func (avsWriter *AvsWriter) SendAggregatedResponse(
ctx context.Context, task taskmanager.CheckpointTask,
taskResponse taskmanager.CheckpointTaskResponse,
nonSignerStakesAndSignature taskmanager.IBLSSignatureCheckerNonSignerStakesAndSignature,
) (*types.Receipt, error) {
txOpts, err := w.TxMgr.GetNoSendTxOpts()
txOpts, err := avsWriter.TxMgr.GetNoSendTxOpts()
if err != nil {
w.logger.Errorf("Error getting tx opts")
avsWriter.logger.Errorf("Error getting tx opts")
return nil, err
}
tx, err := w.AvsContractBindings.TaskManager.RespondToCheckpointTask(txOpts, task, taskResponse, nonSignerStakesAndSignature)
tx, err := avsWriter.AvsContractBindings.TaskManager.RespondToCheckpointTask(txOpts, task, taskResponse, nonSignerStakesAndSignature)
if err != nil {
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
avsWriter.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
return nil, err
}
receipt, err := w.TxMgr.Send(ctx, tx)
receipt, err := avsWriter.TxMgr.Send(ctx, tx)
if err != nil {
w.logger.Errorf("Error submitting CreateCheckpointTask tx")
avsWriter.logger.Errorf("Error submitting CreateCheckpointTask tx")
return nil, err
}
return receipt, nil
}

func (w *AvsWriter) RaiseChallenge(
func (avsWriter *AvsWriter) RaiseChallenge(
ctx context.Context,
task taskmanager.CheckpointTask,
taskResponse taskmanager.CheckpointTaskResponse,
taskResponseMetadata taskmanager.CheckpointTaskResponseMetadata,
pubkeysOfNonSigningOperators []taskmanager.BN254G1Point,
) (*types.Receipt, error) {
txOpts, err := w.TxMgr.GetNoSendTxOpts()
txOpts, err := avsWriter.TxMgr.GetNoSendTxOpts()
if err != nil {
w.logger.Errorf("Error getting tx opts")
avsWriter.logger.Errorf("Error getting tx opts")
return nil, err
}
tx, err := w.AvsContractBindings.TaskManager.RaiseAndResolveCheckpointChallenge(txOpts, task, taskResponse, taskResponseMetadata, pubkeysOfNonSigningOperators)
tx, err := avsWriter.AvsContractBindings.TaskManager.RaiseAndResolveCheckpointChallenge(txOpts, task, taskResponse, taskResponseMetadata, pubkeysOfNonSigningOperators)
if err != nil {
w.logger.Errorf("Error assembling RaiseChallenge tx")
avsWriter.logger.Errorf("Error assembling RaiseChallenge tx")
return nil, err
}
receipt, err := w.TxMgr.Send(ctx, tx)
receipt, err := avsWriter.TxMgr.Send(ctx, tx)
if err != nil {
w.logger.Errorf("Error submitting CreateCheckpointTask tx")
avsWriter.logger.Errorf("Error submitting CreateCheckpointTask tx")
return nil, err
}
return receipt, nil
Expand Down
Loading
Loading