diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index f41b7c4a..31afed15 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -74,6 +74,8 @@ type Aggregator struct { restServerIpPortAddr string avsWriter chainio.AvsWriterer avsReader chainio.AvsReaderer + rollupBroadcaster RollupBroadcasterer + // aggregation related fields taskBlsAggregationService blsagg.BlsAggregationService stateRootUpdateBlsAggregationService MessageBlsAggregationService @@ -99,7 +101,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } - avsReader, err := chainio.BuildAvsReader(config.SFFLRegistryCoordinatorAddr, config.OperatorStateRetrieverAddr, ethHttpClient, logger) + avsReader, err := chainio.BuildAvsReaderFromConfig(config, ethHttpClient, logger) if err != nil { logger.Error("Cannot create avsReader", "err", err) return nil, err @@ -111,7 +113,8 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } - signerV2, _, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: config.EcdsaPrivateKey}, chainId) + signerConfig := signerv2.Config{PrivateKey: config.EcdsaPrivateKey} + signerV2, _, err := signerv2.SignerFromConfig(signerConfig, chainId) if err != nil { panic(err) } @@ -143,6 +146,12 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } + rollupBroadcaster, err := NewRollupBroadcaster(ctx, config.RollupsInfo, signerConfig, config.AggregatorAddress, logger) + if err != nil { + logger.Error("Cannot create rollup broadcaster", "err", err) + return nil, err + } + operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger) avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger) taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger) @@ -155,6 +164,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo restServerIpPortAddr: config.AggregatorRestServerIpPortAddr, avsWriter: avsWriter, avsReader: avsReader, + rollupBroadcaster: rollupBroadcaster, taskBlsAggregationService: taskBlsAggregationService, stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService, operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService, @@ -186,6 +196,7 @@ func (agg *Aggregator) Start(ctx context.Context) error { // TODO: make this based on the actual timestamps timestamp := uint64(0) + broadcasterErrorChan := agg.rollupBroadcaster.GetErrorChan() for { select { case <-ctx.Done(): @@ -198,7 +209,7 @@ func (agg *Aggregator) Start(ctx context.Context) error { agg.handleStateRootUpdateReachedQuorum(blsAggServiceResp) case blsAggServiceResp := <-agg.operatorSetUpdateBlsAggregationService.GetResponseChannel(): agg.logger.Info("Received response from operatorSetUpdateBlsAggregationService", "blsAggServiceResp", blsAggServiceResp) - agg.handleOperatorSetUpdateReachedQuorum(blsAggServiceResp) + agg.handleOperatorSetUpdateReachedQuorum(ctx, blsAggServiceResp) case <-ticker.C: err := agg.sendNewCheckpointTask(timestamp, timestamp) timestamp++ @@ -206,6 +217,10 @@ func (agg *Aggregator) Start(ctx context.Context) error { // we log the errors inside sendNewCheckpointTask() so here we just continue to the next task continue } + + case err := <-broadcasterErrorChan: + // TODO: proper error handling in all class + agg.logger.Error("Received error from broadcaster", "err", err) } } } @@ -319,7 +334,7 @@ func (agg *Aggregator) handleStateRootUpdateReachedQuorum(blsAggServiceResp type } -func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(blsAggServiceResp types.MessageBlsAggregationServiceResponse) { +func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(ctx context.Context, blsAggServiceResp types.MessageBlsAggregationServiceResponse) { agg.operatorSetUpdatesLock.RLock() msg, ok := agg.operatorSetUpdates[blsAggServiceResp.MessageDigest] agg.operatorSetUpdatesLock.RUnlock() @@ -340,6 +355,9 @@ func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(blsAggServiceResp ty return } + signatureInfo := core.FormatBlsAggregationRollup(&blsAggServiceResp) + agg.rollupBroadcaster.BroadcastOperatorSetUpdate(ctx, msg, signatureInfo) + err := agg.msgDb.StoreOperatorSetUpdate(msg) if err != nil { agg.logger.Error("Aggregator could not store message") diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index 1e8d47e9..ae558d50 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -55,7 +55,7 @@ func TestSendNewTask(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, mockAvsWriterer, mockTaskBlsAggService, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) var TASK_INDEX = uint32(0) @@ -82,7 +82,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockMsgDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, mockMsgDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) msg := servicemanager.StateRootUpdateMessage{} @@ -109,7 +109,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockMsgDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) msg := registryrollup.OperatorSetUpdateMessage{} @@ -117,7 +117,10 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) { assert.Nil(t, err) blsAggServiceResp := types.MessageBlsAggregationServiceResponse{ - MessageDigest: msgDigest, + MessageDigest: msgDigest, + NonSignersPubkeysG1: make([]*bls.G1Point, 0), + SignersApkG2: bls.NewZeroG2Point(), + SignersAggSigG1: bls.NewZeroSignature(), } aggregator.operatorSetUpdates[msgDigest] = msg @@ -125,16 +128,19 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) { mockMsgDb.EXPECT().StoreOperatorSetUpdate(msg) mockMsgDb.EXPECT().StoreOperatorSetUpdateAggregation(msg, blsAggServiceResp) + signatureInfo := core.FormatBlsAggregationRollup(&blsAggServiceResp) + mockRollupBroadcaster.EXPECT().BroadcastOperatorSetUpdate(context.Background(), msg, signatureInfo) + assert.Contains(t, aggregator.operatorSetUpdates, msgDigest) - aggregator.handleOperatorSetUpdateReachedQuorum(blsAggServiceResp) + aggregator.handleOperatorSetUpdateReachedQuorum(context.Background(), blsAggServiceResp) assert.NotContains(t, aggregator.operatorSetUpdates, msgDigest) } func createMockAggregator( mockCtrl *gomock.Controller, operatorPubkeyDict map[bls.OperatorId]types.OperatorInfo, -) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageDatabaser, error) { +) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageDatabaser, *mocks.MockRollupBroadcasterer, error) { logger := sdklogging.NewNoopLogger() mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl) mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl) @@ -142,6 +148,7 @@ func createMockAggregator( mockStateRootUpdateBlsAggregationService := mocks.NewMockMessageBlsAggregationService(mockCtrl) mockOperatorSetUpdateBlsAggregationService := mocks.NewMockMessageBlsAggregationService(mockCtrl) mockMsgDb := mocks.NewMockMessageDatabaser(mockCtrl) + mockRollupBroadcaster := mocks.NewMockRollupBroadcasterer(mockCtrl) aggregator := &Aggregator{ logger: logger, @@ -155,8 +162,9 @@ func createMockAggregator( taskResponses: make(map[coretypes.TaskIndex]map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse), stateRootUpdates: make(map[coretypes.MessageDigest]servicemanager.StateRootUpdateMessage), operatorSetUpdates: make(map[coretypes.MessageDigest]registryrollup.OperatorSetUpdateMessage), + rollupBroadcaster: mockRollupBroadcaster, } - return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, nil + return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, nil } // just a mock ethclient to pass to bindings diff --git a/aggregator/gen.go b/aggregator/gen.go index 3b5d09b6..ddbfb538 100644 --- a/aggregator/gen.go +++ b/aggregator/gen.go @@ -2,3 +2,4 @@ package aggregator //go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService //go:generate mockgen -destination=./mocks/message_database.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageDatabaser +//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer diff --git a/aggregator/message_blsagg.go b/aggregator/message_blsagg.go index 1b107a4a..e8a23516 100644 --- a/aggregator/message_blsagg.go +++ b/aggregator/message_blsagg.go @@ -115,15 +115,17 @@ func (mbas *MessageBlsAggregatorService) InitializeMessageIfNotExists( timeToExpiry time.Duration, ethBlockNumber uint64, ) error { + mbas.messageChansLock.Lock() + defer mbas.messageChansLock.Unlock() + if _, taskExists := mbas.signedMessageDigestsCs[messageDigest]; taskExists { return nil } signedMessageDigestsC := make(chan SignedMessageDigest) - mbas.messageChansLock.Lock() mbas.signedMessageDigestsCs[messageDigest] = signedMessageDigestsC - mbas.messageChansLock.Unlock() go mbas.singleMessageAggregatorGoroutineFunc(messageDigest, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedMessageDigestsC, ethBlockNumber) + return nil } @@ -133,9 +135,10 @@ func (mbas *MessageBlsAggregatorService) ProcessNewSignature( blsSignature *bls.Signature, operatorId bls.OperatorId, ) error { - mbas.messageChansLock.Lock() + mbas.messageChansLock.RLock() messageC, taskInitialized := mbas.signedMessageDigestsCs[messageDigest] - mbas.messageChansLock.Unlock() + mbas.messageChansLock.RUnlock() + if !taskInitialized { return MessageNotFoundErrorFn(messageDigest) } @@ -171,8 +174,10 @@ func (mbas *MessageBlsAggregatorService) singleMessageAggregatorGoroutineFunc( select { case signedMessageDigest := <-signedMessageDigestsC: mbas.logger.Debug("Message goroutine received new signed message digest", "messageDigest", messageDigest) - mbas.handleSignedMessageDigest(signedMessageDigest, validationInfo) - return + + if mbas.handleSignedMessageDigest(signedMessageDigest, validationInfo) { + return + } case <-messageExpiredTimer.C: mbas.aggregatedResponsesC <- aggtypes.MessageBlsAggregationServiceResponse{ Err: MessageExpiredError, @@ -230,12 +235,12 @@ func (mbas *MessageBlsAggregatorService) fetchValidationInfo(quorumNumbers []typ } } -func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessageDigest SignedMessageDigest, validationInfo signedMessageDigestValidationInfo) { +func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessageDigest SignedMessageDigest, validationInfo signedMessageDigestValidationInfo) bool { err := mbas.verifySignature(signedMessageDigest, validationInfo.operatorsAvsStateDict) signedMessageDigest.SignatureVerificationErrorC <- err if err != nil { - return + return false } digestAggregatedOperators, ok := validationInfo.aggregatedOperatorsDict[signedMessageDigest.MessageDigest] @@ -263,7 +268,7 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage validationInfo.aggregatedOperatorsDict[signedMessageDigest.MessageDigest] = digestAggregatedOperators if !checkIfStakeThresholdsMet(digestAggregatedOperators.signersTotalStakePerQuorum, validationInfo.totalStakePerQuorum, validationInfo.quorumThresholdPercentagesMap) { - return + return false } nonSignersOperatorIds := []types.OperatorId{} @@ -279,7 +284,7 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage mbas.aggregatedResponsesC <- aggtypes.MessageBlsAggregationServiceResponse{ Err: err, } - return + return false } messageBlsAggregationServiceResponse := aggtypes.MessageBlsAggregationServiceResponse{ @@ -297,6 +302,8 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage } mbas.aggregatedResponsesC <- messageBlsAggregationServiceResponse + + return true } func (mbas *MessageBlsAggregatorService) closeMessageGoroutine(messageDigest coretypes.MessageDigest) { diff --git a/aggregator/mocks/rollup_broadcaster.go b/aggregator/mocks/rollup_broadcaster.go new file mode 100644 index 00000000..fe2d7edc --- /dev/null +++ b/aggregator/mocks/rollup_broadcaster.go @@ -0,0 +1,66 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/NethermindEth/near-sffl/aggregator (interfaces: RollupBroadcasterer) +// +// Generated by this command: +// +// mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + contractSFFLRegistryRollup "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLRegistryRollup" + gomock "go.uber.org/mock/gomock" +) + +// MockRollupBroadcasterer is a mock of RollupBroadcasterer interface. +type MockRollupBroadcasterer struct { + ctrl *gomock.Controller + recorder *MockRollupBroadcastererMockRecorder +} + +// MockRollupBroadcastererMockRecorder is the mock recorder for MockRollupBroadcasterer. +type MockRollupBroadcastererMockRecorder struct { + mock *MockRollupBroadcasterer +} + +// NewMockRollupBroadcasterer creates a new mock instance. +func NewMockRollupBroadcasterer(ctrl *gomock.Controller) *MockRollupBroadcasterer { + mock := &MockRollupBroadcasterer{ctrl: ctrl} + mock.recorder = &MockRollupBroadcastererMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRollupBroadcasterer) EXPECT() *MockRollupBroadcastererMockRecorder { + return m.recorder +} + +// BroadcastOperatorSetUpdate mocks base method. +func (m *MockRollupBroadcasterer) BroadcastOperatorSetUpdate(arg0 context.Context, arg1 contractSFFLRegistryRollup.OperatorSetUpdateMessage, arg2 contractSFFLRegistryRollup.OperatorsSignatureInfo) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastOperatorSetUpdate", arg0, arg1, arg2) +} + +// BroadcastOperatorSetUpdate indicates an expected call of BroadcastOperatorSetUpdate. +func (mr *MockRollupBroadcastererMockRecorder) BroadcastOperatorSetUpdate(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastOperatorSetUpdate", reflect.TypeOf((*MockRollupBroadcasterer)(nil).BroadcastOperatorSetUpdate), arg0, arg1, arg2) +} + +// GetErrorChan mocks base method. +func (m *MockRollupBroadcasterer) GetErrorChan() <-chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetErrorChan") + ret0, _ := ret[0].(<-chan error) + return ret0 +} + +// GetErrorChan indicates an expected call of GetErrorChan. +func (mr *MockRollupBroadcastererMockRecorder) GetErrorChan() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetErrorChan", reflect.TypeOf((*MockRollupBroadcasterer)(nil).GetErrorChan)) +} diff --git a/aggregator/rest_server_test.go b/aggregator/rest_server_test.go index fafa7f7f..2abfeb4c 100644 --- a/aggregator/rest_server_test.go +++ b/aggregator/rest_server_test.go @@ -22,7 +22,7 @@ func TestGetStateRootUpdateAggregation(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, mockDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) go aggregator.startRestServer() @@ -97,7 +97,7 @@ func TestGetOperatorSetUpdateAggregation(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, mockDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) go aggregator.startRestServer() diff --git a/aggregator/rollup_broadcaster.go b/aggregator/rollup_broadcaster.go new file mode 100644 index 00000000..1af12ddf --- /dev/null +++ b/aggregator/rollup_broadcaster.go @@ -0,0 +1,162 @@ +package aggregator + +import ( + "context" + "time" + + "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" + "github.com/Layr-Labs/eigensdk-go/chainio/txmgr" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/Layr-Labs/eigensdk-go/signerv2" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + + registryrollup "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLRegistryRollup" + "github.com/NethermindEth/near-sffl/core/config" +) + +const NUM_OF_RETRIES = 5 +const TX_RETRY_INTERVAL = time.Millisecond * 200 + +type RollupWriter struct { + txMgr txmgr.TxManager + client eth.EthClient + sfflRegistryRollup *registryrollup.ContractSFFLRegistryRollup + + logger logging.Logger +} + +func NewRollupWriter(ctx context.Context, rollupInfo config.RollupInfo, signerConfig signerv2.Config, address common.Address, logger logging.Logger) (*RollupWriter, error) { + client, err := eth.NewClient(rollupInfo.RpcUrl) + if err != nil { + return nil, err + } + + chainId, err := client.ChainID(ctx) + if err != nil { + return nil, err + } + + signerV2, _, err := signerv2.SignerFromConfig(signerConfig, chainId) + if err != nil { + panic(err) + } + txMgr := txmgr.NewSimpleTxManager(client, logger, signerV2, address) + + sfflRegistryRollup, err := registryrollup.NewContractSFFLRegistryRollup(rollupInfo.SFFLRegistryRollupAddr, client) + if err != nil { + return nil, err + } + + return &RollupWriter{ + txMgr: txMgr, + client: client, + sfflRegistryRollup: sfflRegistryRollup, + logger: logger, + }, nil +} + +func (w *RollupWriter) UpdateOperatorSet(ctx context.Context, message registryrollup.OperatorSetUpdateMessage, signatureInfo registryrollup.OperatorsSignatureInfo) error { + operation := func() error { + txOpts, err := w.txMgr.GetNoSendTxOpts() + if err != nil { + w.logger.Error("Error getting tx opts", "err", err) + return err + } + + nextOperatorUpdateId, err := w.sfflRegistryRollup.NextOperatorUpdateId(&bind.CallOpts{}) + if err != nil { + w.logger.Error("Error fetching NextOperatorUpdateId", "err", err) + return err + } + + // TODO: queue in case message.id > nextOperatorUpdateId + if message.Id != nextOperatorUpdateId { + return nil + } + + tx, err := w.sfflRegistryRollup.UpdateOperatorSet(txOpts, message, signatureInfo) + if err != nil { + w.logger.Error("Error assembling UpdateOperatorSet tx", "err", err) + return err + } + + _, err = w.txMgr.Send(ctx, tx) + if err != nil { + return err + } + + return nil + } + + var err error = nil + for i := 0; i < NUM_OF_RETRIES; i++ { + err = operation() + if err == nil { + return nil + } else { + // TODO: return on some tx errors + w.logger.Warn("Sending UpdateOperatorSet failed", "err", err) + } + + select { + case <-ctx.Done(): + w.logger.Info("Context canceled") + return ctx.Err() + + case <-time.After(TX_RETRY_INTERVAL): + continue + } + } + + return err +} + +type RollupBroadcasterer interface { + BroadcastOperatorSetUpdate(ctx context.Context, message registryrollup.OperatorSetUpdateMessage, signatureInfo registryrollup.OperatorsSignatureInfo) + GetErrorChan() <-chan error +} + +type RollupBroadcaster struct { + writers []*RollupWriter + errorChan chan error +} + +func NewRollupBroadcaster(ctx context.Context, rollupsInfo map[uint32]config.RollupInfo, signerConfig signerv2.Config, address common.Address, logger logging.Logger) (*RollupBroadcaster, error) { + writers := make([]*RollupWriter, 0, len(rollupsInfo)) + for id, info := range rollupsInfo { + writer, err := NewRollupWriter(ctx, info, signerConfig, address, logger) + if err != nil { + logger.Error("Couldn't create RollupWriter", "chainId", id, "err", err) + return nil, err + } + + writers = append(writers, writer) + } + + return &RollupBroadcaster{ + writers: writers, + errorChan: make(chan error), + }, nil +} + +func (b *RollupBroadcaster) BroadcastOperatorSetUpdate(ctx context.Context, message registryrollup.OperatorSetUpdateMessage, signatureInfo registryrollup.OperatorsSignatureInfo) { + go func() { + for _, writer := range b.writers { + select { + case <-ctx.Done(): + return + + default: + err := writer.UpdateOperatorSet(ctx, message, signatureInfo) + if err != nil { + b.errorChan <- err + } + } + } + }() +} + +func (b *RollupBroadcaster) GetErrorChan() <-chan error { + return b.errorChan +} diff --git a/aggregator/rpc_server_test.go b/aggregator/rpc_server_test.go index f5415530..fa0a9df3 100644 --- a/aggregator/rpc_server_test.go +++ b/aggregator/rpc_server_test.go @@ -29,7 +29,7 @@ func TestProcessSignedCheckpointTaskResponse(t *testing.T) { var FROM_NEAR_BLOCK = uint64(3) var TO_NEAR_BLOCK = uint64(4) - aggregator, _, _, mockBlsAggServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, mockBlsAggServ, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) signedCheckpointTaskResponse, err := createMockSignedCheckpointTaskResponse(MockTask{ @@ -55,7 +55,7 @@ func TestProcessSignedStateRootUpdateMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, mockMessageBlsAggServ, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, mockMessageBlsAggServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) message := servicemanager.StateRootUpdateMessage{ @@ -81,7 +81,7 @@ func TestProcessOperatorSetUpdateMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, mockAvsReader, _, _, _, mockMessageBlsAggServ, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, mockAvsReader, _, _, _, mockMessageBlsAggServ, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) message := registryrollup.OperatorSetUpdateMessage{ diff --git a/config-files/aggregator-docker-compose.yaml b/config-files/aggregator-docker-compose.yaml index 7de5b63a..452442d8 100644 --- a/config-files/aggregator-docker-compose.yaml +++ b/config-files/aggregator-docker-compose.yaml @@ -6,3 +6,6 @@ eth_ws_url: ws://mainnet-anvil:8545 aggregator_server_ip_port_address: 0.0.0.0:8090 aggregator_rest_server_ip_port_address: localhost:5000 aggregator_database_path: "" +rollup_ids_to_rpc_urls: + 2: ws://rollup0-anvil:8546 + 3: ws://rollup1-anvil:8547 diff --git a/config-files/aggregator.yaml b/config-files/aggregator.yaml index 2e43b046..c405d62a 100644 --- a/config-files/aggregator.yaml +++ b/config-files/aggregator.yaml @@ -6,3 +6,5 @@ eth_ws_url: ws://localhost:8545 aggregator_server_ip_port_address: localhost:8090 aggregator_rest_server_ip_port_address: localhost:5000 aggregator_database_path: ./aggregator.db +rollup_ids_to_rpc_urls: + 2: ws://localhost:8546 diff --git a/contracts/evm/src/rollup/utils/Operators.sol b/contracts/evm/src/rollup/utils/Operators.sol index fdffc251..6592a28e 100644 --- a/contracts/evm/src/rollup/utils/Operators.sol +++ b/contracts/evm/src/rollup/utils/Operators.sol @@ -157,7 +157,7 @@ library Operators { uint256 operatorWeight = self.pubkeyHashToWeight[nonSignerPubkeyHashes[i]]; - require(operatorWeight >= 0, "Operator has zero weight"); + require(operatorWeight != 0, "Operator has zero weight"); apk = apk.plus(signatureInfo.nonSignerPubkeys[i]); weight -= operatorWeight; diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 447e71d2..f83ec617 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -10,7 +10,7 @@ import ( sdkavsregistry "github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" - logging "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/Layr-Labs/eigensdk-go/logging" erc20mock "github.com/NethermindEth/near-sffl/contracts/bindings/ERC20Mock" opsetupdatereg "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLOperatorSetUpdateRegistry" diff --git a/core/chainio/bindings.go b/core/chainio/bindings.go index 41e49df5..6d0ef446 100644 --- a/core/chainio/bindings.go +++ b/core/chainio/bindings.go @@ -34,6 +34,7 @@ func NewAvsManagersBindings(registryCoordinatorAddr, operatorStateRetrieverAddr if err != nil { return nil, err } + contractServiceManager, err := csservicemanager.NewContractSFFLServiceManager(serviceManagerAddr, ethclient) if err != nil { logger.Error("Failed to fetch IServiceManager contract", "err", err) @@ -55,6 +56,7 @@ func NewAvsManagersBindings(registryCoordinatorAddr, operatorStateRetrieverAddr if err != nil { return nil, err } + contractOperatorSetUpdateRegistry, err := opsetupdatereg.NewContractSFFLOperatorSetUpdateRegistry(operatorSetUpdateRegistryAddr, ethclient) if err != nil { logger.Error("Failed to fetch OperatorSetUpdateRegistry contract", "err", err) @@ -77,5 +79,6 @@ func (b *AvsManagersBindings) GetErc20Mock(tokenAddr common.Address) (*erc20mock b.logger.Error("Failed to fetch ERC20Mock contract", "err", err) return nil, err } + return contractErc20Mock, nil } diff --git a/core/config/config.go b/core/config/config.go index 0559b402..80ef20b4 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -3,6 +3,7 @@ package config import ( "crypto/ecdsa" "errors" + "fmt" "os" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,7 @@ type Config struct { // only take an ethclient or an rpcUrl (and build the ethclient at each constructor site) EthHttpRpcUrl string EthWsRpcUrl string + RollupsInfo map[uint32]RollupInfo OperatorStateRetrieverAddr common.Address SFFLRegistryCoordinatorAddr common.Address AggregatorServerIpPortAddr string @@ -43,6 +45,7 @@ type ConfigRaw struct { AggregatorRestServerIpPortAddr string `yaml:"aggregator_rest_server_ip_port_address"` AggregatorDatabasePath string `yaml:"aggregator_database_path"` RegisterOperatorOnStartup bool `yaml:"register_operator_on_startup"` + RollupIdsToRpcUrls map[uint32]string `yaml:"rollup_ids_to_rpc_urls"` } // These are read from SFFLDeploymentFileFlag @@ -54,6 +57,26 @@ type SFFLContractsRaw struct { OperatorStateRetrieverAddr string `json:"operatorStateRetriever"` } +// These are read from RollupSFFLDeploymentFilesFlag +type RollupSFFLDeploymentRaw struct { + Addresses RollupAddressesRaw `json:"addresses"` + ChainInfo ChainInfoRaw `json:"chainInfo"` +} + +type RollupAddressesRaw struct { + SFFLRegistryRollupAddr string `json:"sfflRegistryRollup"` +} + +type ChainInfoRaw struct { + ChainId uint32 `json:"chainId"` + DeploymentBlock uint `json:"deploymentBlock"` +} + +type RollupInfo struct { + SFFLRegistryRollupAddr common.Address + RpcUrl string +} + func NewConfigRaw(ctx *cli.Context) (*ConfigRaw, error) { var configRaw ConfigRaw configFilePath := ctx.GlobalString(ConfigFileFlag.Name) @@ -69,10 +92,48 @@ func NewConfigRaw(ctx *cli.Context) (*ConfigRaw, error) { return &configRaw, nil } +func ReadRollupSFFLDeploymentsRaw(rollupSFFLDeploymentFilesPath []string) []RollupSFFLDeploymentRaw { + rollupDeploymentsInfo := make([]RollupSFFLDeploymentRaw, len(rollupSFFLDeploymentFilesPath)) + for i, filePath := range rollupSFFLDeploymentFilesPath { + var rollupSFFLDeploymentRaw RollupSFFLDeploymentRaw + if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { + panic("Path " + filePath + " does not exist") + } + + sdkutils.ReadJsonConfig(filePath, &rollupSFFLDeploymentRaw) + rollupDeploymentsInfo[i] = rollupSFFLDeploymentRaw + } + + return rollupDeploymentsInfo +} + +func CompileRollupsInfo(rollupDeploymentsInfo []RollupSFFLDeploymentRaw, configRaw *ConfigRaw) map[uint32]RollupInfo { + // Map with ConfigRaw + rollupsInfo := make(map[uint32]RollupInfo) + for _, info := range rollupDeploymentsInfo { + url, exist := configRaw.RollupIdsToRpcUrls[info.ChainInfo.ChainId] + if !exist { + // TODO: or just skip? + panic(fmt.Sprintf("RPC URL doesn't exist for chainId %d", info.ChainInfo.ChainId)) + } + + rollupsInfo[info.ChainInfo.ChainId] = RollupInfo{ + RpcUrl: "http://" + url, + SFFLRegistryRollupAddr: common.HexToAddress(info.Addresses.SFFLRegistryRollupAddr), + } + } + + return rollupsInfo +} + // NewConfig parses config file to read from from flags or environment variables // Note: This config is shared by challenger and aggregator and so we put in the core. // Operator has a different config and is meant to be used by the operator CLI. func NewConfig(ctx *cli.Context, configRaw ConfigRaw, logger sdklogging.Logger) (*Config, error) { + rollupSFFLDeploymentFilesPath := ctx.GlobalStringSlice(RollupSFFLDeploymentFilesFlag.Name) + rollupDeploymentsInfo := ReadRollupSFFLDeploymentsRaw(rollupSFFLDeploymentFilesPath) + rollupsInfo := CompileRollupsInfo(rollupDeploymentsInfo, &configRaw) + var sfflDeploymentRaw SFFLDeploymentRaw sfflDeploymentFilePath := ctx.GlobalString(SFFLDeploymentFileFlag.Name) if _, err := os.Stat(sfflDeploymentFilePath); errors.Is(err, os.ErrNotExist) { @@ -108,6 +169,7 @@ func NewConfig(ctx *cli.Context, configRaw ConfigRaw, logger sdklogging.Logger) AggregatorRestServerIpPortAddr: configRaw.AggregatorRestServerIpPortAddr, AggregatorDatabasePath: configRaw.AggregatorDatabasePath, AggregatorAddress: aggregatorAddr, + RollupsInfo: rollupsInfo, } config.validate() @@ -142,6 +204,11 @@ var ( Required: true, EnvVar: "ECDSA_PRIVATE_KEY", } + RollupSFFLDeploymentFilesFlag = cli.StringSliceFlag{ + Name: "rollup-configs", + Usage: "Load configuration from files", + Required: true, + } /* Optional Flags */ ) @@ -149,6 +216,7 @@ var requiredFlags = []cli.Flag{ ConfigFileFlag, SFFLDeploymentFileFlag, EcdsaPrivateKeyFlag, + RollupSFFLDeploymentFilesFlag, } var optionalFlags = []cli.Flag{} diff --git a/core/utils.go b/core/utils.go index 8c6e65bc..4990c915 100644 --- a/core/utils.go +++ b/core/utils.go @@ -1,6 +1,7 @@ package core import ( + "github.com/NethermindEth/near-sffl/aggregator/types" "math/big" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -201,3 +202,30 @@ func ConvertToBN254G2Point(input *bls.G2Point) taskmanager.BN254G2Point { } return output } + +func FormatBlsAggregationRollup(agg *types.MessageBlsAggregationServiceResponse) registryrollup.OperatorsSignatureInfo { + var nonSignerPubkeys []registryrollup.BN254G1Point + + for _, pubkey := range agg.NonSignersPubkeysG1 { + nonSignerPubkeys = append(nonSignerPubkeys, registryrollup.BN254G1Point{ + X: pubkey.X.BigInt(big.NewInt(0)), + Y: pubkey.Y.BigInt(big.NewInt(0)), + }) + } + + apkG2 := registryrollup.BN254G2Point{ + X: [2]*big.Int{agg.SignersApkG2.X.A1.BigInt(big.NewInt(0)), agg.SignersApkG2.X.A0.BigInt(big.NewInt(0))}, + Y: [2]*big.Int{agg.SignersApkG2.Y.A1.BigInt(big.NewInt(0)), agg.SignersApkG2.Y.A0.BigInt(big.NewInt(0))}, + } + + sigma := registryrollup.BN254G1Point{ + X: agg.SignersAggSigG1.X.BigInt(big.NewInt(0)), + Y: agg.SignersAggSigG1.Y.BigInt(big.NewInt(0)), + } + + return registryrollup.OperatorsSignatureInfo{ + NonSignerPubkeys: nonSignerPubkeys, + ApkG2: apkG2, + Sigma: sigma, + } +} diff --git a/operator/attestor/attestor.go b/operator/attestor/attestor.go index 88f303e1..17be6918 100644 --- a/operator/attestor/attestor.go +++ b/operator/attestor/attestor.go @@ -111,11 +111,9 @@ func NewAttestor(config *types.NodeConfig, blsKeypair *bls.KeyPair, operatorId b } func (attestor *Attestor) Start(ctx context.Context) error { - clientsNum := len(attestor.clients) - subscriptions := make([]ethereum.Subscription, clientsNum) - headersCs := make([]chan *ethtypes.Header, clientsNum) + subscriptions := make(map[uint32]ethereum.Subscription) + headersCs := make(map[uint32]chan *ethtypes.Header) - i := 0 for rollupId, client := range attestor.clients { headersC := make(chan *ethtypes.Header) subscription, err := client.SubscribeNewHead(ctx, headersC) @@ -124,17 +122,14 @@ func (attestor *Attestor) Start(ctx context.Context) error { return err } - subscriptions[i] = subscription - headersCs[i] = headersC - i++ + subscriptions[rollupId] = subscription + headersCs[rollupId] = headersC } go attestor.processMQBlocks(ctx) - i = 0 for rollupId, _ := range attestor.clients { - go attestor.processRollupHeaders(rollupId, headersCs[i], subscriptions[i], ctx) - i++ + go attestor.processRollupHeaders(rollupId, headersCs[rollupId], subscriptions[rollupId], ctx) } return nil @@ -240,7 +235,7 @@ loop: } // Filter notifications - if rollupHeader.Number != mqBlock.Block.Header().Number { + if rollupHeader.Number.Cmp(mqBlock.Block.Header().Number) != 0 { continue loop } diff --git a/operator/rpc_client.go b/operator/rpc_client.go index 8654f468..9218a9ff 100644 --- a/operator/rpc_client.go +++ b/operator/rpc_client.go @@ -105,7 +105,7 @@ func (c *AggregatorRpcClient) SendSignedStateRootUpdateToAggregator(signedStateR } func (c *AggregatorRpcClient) SendSignedOperatorSetUpdateToAggregator(signedOperatorSetUpdateMessage *coretypes.SignedOperatorSetUpdateMessage) { - c.logger.Info("Sending signed state root update message to aggregator", "signedOperatorSetUpdateMessage", fmt.Sprintf("%#v", signedOperatorSetUpdateMessage)) + c.logger.Info("Sending operator set update message to aggregator", "signedOperatorSetUpdateMessage", fmt.Sprintf("%#v", signedOperatorSetUpdateMessage)) c.sendRequest(func() error { var reply bool @@ -114,7 +114,7 @@ func (c *AggregatorRpcClient) SendSignedOperatorSetUpdateToAggregator(signedOper if err != nil { c.logger.Info("Received error from aggregator", "err", err) } else { - c.logger.Info("Signed state root update message accepted by aggregator.", "reply", reply) + c.logger.Info("Signed operator set update message accepted by aggregator.", "reply", reply) c.metrics.IncNumMessagesAcceptedByAggregator() } diff --git a/tests/anvil/deploy-rollup-avs-save-anvil-state.sh b/tests/anvil/deploy-rollup-avs-save-anvil-state.sh deleted file mode 100755 index 5e89aa9b..00000000 --- a/tests/anvil/deploy-rollup-avs-save-anvil-state.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash - -PORT=8546 -RPC_URL=http://localhost:${PORT} -PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 - -# cd to the directory of this script so that this can be run from anywhere -parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P ) -cd "$parent_path" - -# start an anvil instance in the background -anvil --port $PORT --dump-state data/rollup-avs-deployed-anvil-state.json & -cd ../../contracts/evm -forge create src/rollup/SFFLRegistryRollup.sol:SFFLRegistryRollup --constructor-args '[((643552363890320897587044283125191574906281609959531590546948318138132520777,7028377728703212953187883551402495866059211864756496641401904395458852281995),1000)]' 66 1 --private-key $PRIVATE_KEY --rpc-url $RPC_URL - -# we also do this here to make sure the operator has funds to register with the eigenlayer contracts -cast send 0xD5A0359da7B310917d7760385516B2426E86ab7f --value 10ether --private-key 0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6 --rpc-url $RPC_URL -# kill anvil to save its state -pkill anvil diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 0b421769..3fe3d9c2 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -2,6 +2,7 @@ package integration_test import ( "context" + "crypto/ecdsa" "encoding/base64" "encoding/json" "fmt" @@ -22,6 +23,7 @@ import ( sdkEcdsa "github.com/Layr-Labs/eigensdk-go/crypto/ecdsa" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" sdkutils "github.com/Layr-Labs/eigensdk-go/utils" + "github.com/NethermindEth/near-sffl/core" "github.com/docker/go-connections/nat" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -34,7 +36,6 @@ import ( "github.com/testcontainers/testcontainers-go/wait" "github.com/NethermindEth/near-sffl/aggregator" - aggtypes "github.com/NethermindEth/near-sffl/aggregator/types" registryrollup "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLRegistryRollup" "github.com/NethermindEth/near-sffl/core/chainio" "github.com/NethermindEth/near-sffl/core/config" @@ -45,7 +46,7 @@ import ( const TEST_DATA_DIR = "../../test_data" func TestIntegration(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 220*time.Second) setup := setupTestEnv(t, ctx) t.Cleanup(func() { cancel() @@ -53,9 +54,7 @@ func TestIntegration(t *testing.T) { setup.cleanup() }) - time.Sleep(10 * time.Second) - - newOperator := startOperator(t, ctx, genOperatorConfig(t, ctx, setup.mainnetAnvil, setup.rollupAnvils, setup.rabbitMq)) + time.Sleep(30 * time.Second) taskHash, err := setup.avsReader.AvsServiceBindings.TaskManager.AllCheckpointTaskHashes(&bind.CallOpts{}, 0) if err != nil { @@ -74,12 +73,37 @@ func TestIntegration(t *testing.T) { t.Fatalf("Task response hash is empty") } - stateRootUpdate, err := getStateRootUpdateAggregation(setup.aggregatorRestUrl, uint32(setup.rollupAnvils[0].ChainID.Uint64()), 9) + stateRootHeight := uint64(10) + stateRootUpdate, err := getStateRootUpdateAggregation(setup.aggregatorRestUrl, uint32(setup.rollupAnvils[0].ChainID.Uint64()), stateRootHeight) if err != nil { t.Fatalf("Cannot get state root update: %s", err.Error()) } + _, err = setup.registryRollups[1].UpdateStateRoot(setup.registryRollupAuths[1], registryrollup.StateRootUpdateMessage(stateRootUpdate.Message), core.FormatBlsAggregationRollup(&stateRootUpdate.Aggregation)) + if err != nil { + t.Fatalf("Error updating state root: %s", err.Error()) + } + + newOperatorConfig, _, _ := genOperatorConfig(t, ctx, setup.mainnetAnvil, setup.rollupAnvils, setup.rabbitMq) + newOperator := startOperator(t, ctx, newOperatorConfig) + + time.Sleep(30 * time.Second) + + for _, registryRollup := range setup.registryRollups { + nextOperatorSetUpdateId, err := registryRollup.NextOperatorUpdateId(&bind.CallOpts{}) + if err != nil { + t.Fatalf("Error getting next operator set update ID: %s", err.Error()) + } + if nextOperatorSetUpdateId != 2 { + t.Fatalf("Wrong next operator set update ID: expected %d, got %d", 2, nextOperatorSetUpdateId) + } + } - _, err = setup.registryRollups[1].UpdateStateRoot(setup.registryRollupAuths[1], registryrollup.StateRootUpdateMessage(stateRootUpdate.Message), formatBlsAggregationRollup(t, &stateRootUpdate.Aggregation)) + stateRootHeight = uint64(16) + stateRootUpdate, err = getStateRootUpdateAggregation(setup.aggregatorRestUrl, uint32(setup.rollupAnvils[0].ChainID.Uint64()), stateRootHeight) + if err != nil { + t.Fatalf("Cannot get state root update: %s", err.Error()) + } + _, err = setup.registryRollups[1].UpdateStateRoot(setup.registryRollupAuths[1], registryrollup.StateRootUpdateMessage(stateRootUpdate.Message), core.FormatBlsAggregationRollup(&stateRootUpdate.Aggregation)) if err != nil { t.Fatalf("Error updating state root: %s", err.Error()) } @@ -108,11 +132,6 @@ func TestIntegration(t *testing.T) { } assert.Equal(t, expectedUpdatedOperators, operatorSetUpdate.Message.Operators) - _, err = setup.registryRollups[1].UpdateOperatorSet(setup.registryRollupAuths[1], operatorSetUpdate.Message, formatBlsAggregationRollup(t, &operatorSetUpdate.Aggregation)) - if err != nil { - t.Fatalf("Error updating state root: %s", err.Error()) - } - <-ctx.Done() } @@ -164,25 +183,33 @@ func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { sfflDeploymentRaw := readSfflDeploymentRaw() - configRaw := buildConfigRaw(mainnetAnvil) + configRaw := buildConfigRaw(mainnetAnvil, rollupAnvils) logger, err := sdklogging.NewZapLogger(configRaw.Environment) if err != nil { t.Fatalf("Failed to create logger: %s", err.Error()) } - nodeConfig := genOperatorConfig(t, ctx, mainnetAnvil, rollupAnvils, rabbitMq) + nodeConfig, keyPair, _ := genOperatorConfig(t, ctx, mainnetAnvil, rollupAnvils, rabbitMq) + rollupInitialOperatorSet := []registryrollup.OperatorsOperator{ + { + Pubkey: registryrollup.BN254G1Point{ + X: keyPair.PubKey.X.BigInt(big.NewInt(0)), + Y: keyPair.PubKey.Y.BigInt(big.NewInt(0)), + }, + Weight: big.NewInt(1000), + }, + } + addresses, registryRollups, registryRollupAuths := deployRegistryRollups(t, rollupInitialOperatorSet, 1, rollupAnvils) operator := startOperator(t, ctx, nodeConfig) - config := buildConfig(t, sfflDeploymentRaw, configRaw, logger) + config := buildConfig(t, sfflDeploymentRaw, addresses, rollupAnvils, configRaw) aggregator := startAggregator(t, ctx, config, logger) - avsReader, err := chainio.BuildAvsReaderFromConfig(config, mainnetAnvil.HttpClient, logger) + avsReader, err := chainio.BuildAvsReader(common.HexToAddress(sfflDeploymentRaw.Addresses.RegistryCoordinatorAddr), common.HexToAddress(sfflDeploymentRaw.Addresses.OperatorStateRetrieverAddr), mainnetAnvil.HttpClient, logger) if err != nil { t.Fatalf("Cannot create AVS Reader: %s", err.Error()) } - registryRollups, registryRollupAuths := deployRegistryRollups(t, ctx, avsReader, rollupAnvils) - cleanup := func() { if err := os.RemoveAll(TEST_DATA_DIR); err != nil { t.Fatalf("Error cleaning test data dir: %s", err.Error()) @@ -282,7 +309,7 @@ func readSfflDeploymentRaw() config.SFFLDeploymentRaw { return sfflDeploymentRaw } -func genOperatorConfig(t *testing.T, ctx context.Context, mainnetAnvil *AnvilInstance, rollupAnvils []*AnvilInstance, rabbitMq *rabbitmq.RabbitMQContainer) types.NodeConfig { +func genOperatorConfig(t *testing.T, ctx context.Context, mainnetAnvil *AnvilInstance, rollupAnvils []*AnvilInstance, rabbitMq *rabbitmq.RabbitMQContainer) (types.NodeConfig, *bls.KeyPair, *ecdsa.PrivateKey) { nodeConfig := types.NodeConfig{} nodeConfigFilePath := "../../config-files/operator.anvil.yaml" err := sdkutils.ReadYamlConfig(nodeConfigFilePath, &nodeConfig) @@ -355,10 +382,10 @@ func genOperatorConfig(t *testing.T, ctx context.Context, mainnetAnvil *AnvilIns mainnetAnvil.setBalance(address, big.NewInt(1e18)) - return nodeConfig + return nodeConfig, keyPair, ecdsaKey } -func buildConfigRaw(mainnetAnvil *AnvilInstance) config.ConfigRaw { +func buildConfigRaw(mainnetAnvil *AnvilInstance, rollupAnvils []*AnvilInstance) config.ConfigRaw { var configRaw config.ConfigRaw aggConfigFilePath := "../../config-files/aggregator.yaml" sdkutils.ReadYamlConfig(aggConfigFilePath, &configRaw) @@ -366,10 +393,16 @@ func buildConfigRaw(mainnetAnvil *AnvilInstance) config.ConfigRaw { configRaw.EthWsUrl = mainnetAnvil.WsUrl configRaw.AggregatorDatabasePath = "" + configRaw.RollupIdsToRpcUrls = map[uint32]string{} + for _, el := range rollupAnvils { + cleanedUrl := strings.TrimPrefix(el.HttpUrl, "http://") + configRaw.RollupIdsToRpcUrls[uint32(el.ChainID.Uint64())] = cleanedUrl + } + return configRaw } -func buildConfig(t *testing.T, sfflDeploymentRaw config.SFFLDeploymentRaw, aggConfigRaw config.ConfigRaw, logeer sdklogging.Logger) *config.Config { +func buildConfig(t *testing.T, sfflDeploymentRaw config.SFFLDeploymentRaw, addresses []common.Address, rollupAnvils []*AnvilInstance, aggConfigRaw config.ConfigRaw) *config.Config { aggregatorEcdsaPrivateKeyString := "0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6" if aggregatorEcdsaPrivateKeyString[:2] == "0x" { aggregatorEcdsaPrivateKeyString = aggregatorEcdsaPrivateKeyString[2:] @@ -383,6 +416,11 @@ func buildConfig(t *testing.T, sfflDeploymentRaw config.SFFLDeploymentRaw, aggCo t.Fatalf("Cannot get operator address: %s", err.Error()) } + rollupsInfo := make(map[uint32]config.RollupInfo) + for i, addr := range addresses { + rollupsInfo[uint32(rollupAnvils[i].ChainID.Int64())] = config.RollupInfo{SFFLRegistryRollupAddr: addr, RpcUrl: rollupAnvils[i].WsUrl} + } + return &config.Config{ EcdsaPrivateKey: aggregatorEcdsaPrivateKey, EthHttpRpcUrl: aggConfigRaw.EthRpcUrl, @@ -394,6 +432,7 @@ func buildConfig(t *testing.T, sfflDeploymentRaw config.SFFLDeploymentRaw, aggCo AggregatorDatabasePath: aggConfigRaw.AggregatorDatabasePath, RegisterOperatorOnStartup: aggConfigRaw.RegisterOperatorOnStartup, AggregatorAddress: aggregatorAddr, + RollupsInfo: rollupsInfo, } } @@ -489,21 +528,23 @@ func startAnvilTestContainer(t *testing.T, ctx context.Context, name, exposedPor return anvil } -func deployRegistryRollups(t *testing.T, ctx context.Context, avsReader chainio.AvsReaderer, anvils []*AnvilInstance) ([]*registryrollup.ContractSFFLRegistryRollup, []*bind.TransactOpts) { +func deployRegistryRollups(t *testing.T, initialOperatorSet []registryrollup.OperatorsOperator, nextOperatorSetUpdateId uint64, anvils []*AnvilInstance) ([]common.Address, []*registryrollup.ContractSFFLRegistryRollup, []*bind.TransactOpts) { var registryRollups []*registryrollup.ContractSFFLRegistryRollup var auths []*bind.TransactOpts + var addresses []common.Address for _, anvil := range anvils { - registryRollup, auth := deployRegistryRollup(t, ctx, avsReader, anvil) + addr, registryRollup, auth := deployRegistryRollup(t, initialOperatorSet, nextOperatorSetUpdateId, anvil) registryRollups = append(registryRollups, registryRollup) auths = append(auths, auth) + addresses = append(addresses, addr) } - return registryRollups, auths + return addresses, registryRollups, auths } -func deployRegistryRollup(t *testing.T, ctx context.Context, avsReader chainio.AvsReaderer, anvil *AnvilInstance) (*registryrollup.ContractSFFLRegistryRollup, *bind.TransactOpts) { +func deployRegistryRollup(t *testing.T, initialOperatorSet []registryrollup.OperatorsOperator, nextOperatorSetUpdateId uint64, anvil *AnvilInstance) (common.Address, *registryrollup.ContractSFFLRegistryRollup, *bind.TransactOpts) { t.Logf("Deploying RegistryRollup to chain %s", anvil.ChainID.String()) privateKeyString := "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" @@ -517,33 +558,18 @@ func deployRegistryRollup(t *testing.T, ctx context.Context, avsReader chainio.A t.Fatalf("Error generating transactor: %s", err.Error()) } - operatorSetUpdateId := uint64(0) - - operatorsDelta, err := avsReader.GetOperatorSetUpdateDelta(ctx, operatorSetUpdateId) - if err != nil { - t.Fatalf("Error getting operator set: %s", err.Error()) - } - - var operators []registryrollup.OperatorsOperator - for _, operator := range operatorsDelta { - operators = append(operators, registryrollup.OperatorsOperator{ - Pubkey: registryrollup.BN254G1Point(operator.Pubkey), - Weight: operator.Weight, - }) - } - - if len(operators) == 0 { + if len(initialOperatorSet) == 0 { t.Fatal("Operator set is empty") } - t.Logf("RegistryRollup deployed with operators: %v", operators) + t.Logf("RegistryRollup deployed with operators: %v", initialOperatorSet) - _, _, registryRollup, err := registryrollup.DeployContractSFFLRegistryRollup(auth, anvil.WsClient, operators, big.NewInt(66), operatorSetUpdateId+1) + addr, _, registryRollup, err := registryrollup.DeployContractSFFLRegistryRollup(auth, anvil.WsClient, initialOperatorSet, big.NewInt(66), nextOperatorSetUpdateId) if err != nil { t.Fatalf("Error deploying RegistryRollup: %s", err.Error()) } - return registryRollup, auth + return addr, registryRollup, auth } func startRollupIndexing(t *testing.T, ctx context.Context, rollupAnvils []*AnvilInstance, indexerContainer testcontainers.Container) { @@ -765,33 +791,6 @@ func getOperatorSetUpdateAggregation(addr string, id uint64) (*aggregator.GetOpe return &response, err } -func formatBlsAggregationRollup(t *testing.T, agg *aggtypes.MessageBlsAggregationServiceResponse) registryrollup.OperatorsSignatureInfo { - var nonSignerPubkeys []registryrollup.BN254G1Point - - for _, pubkey := range agg.NonSignersPubkeysG1 { - nonSignerPubkeys = append(nonSignerPubkeys, registryrollup.BN254G1Point{ - X: pubkey.X.BigInt(big.NewInt(0)), - Y: pubkey.Y.BigInt(big.NewInt(0)), - }) - } - - apkG2 := registryrollup.BN254G2Point{ - X: [2]*big.Int{agg.SignersApkG2.X.A1.BigInt(big.NewInt(0)), agg.SignersApkG2.X.A0.BigInt(big.NewInt(0))}, - Y: [2]*big.Int{agg.SignersApkG2.Y.A1.BigInt(big.NewInt(0)), agg.SignersApkG2.Y.A0.BigInt(big.NewInt(0))}, - } - - sigma := registryrollup.BN254G1Point{ - X: agg.SignersAggSigG1.X.BigInt(big.NewInt(0)), - Y: agg.SignersAggSigG1.Y.BigInt(big.NewInt(0)), - } - - return registryrollup.OperatorsSignatureInfo{ - NonSignerPubkeys: nonSignerPubkeys, - ApkG2: apkG2, - Sigma: sigma, - } -} - func copyFileFromContainer(ctx context.Context, container testcontainers.Container, sourcePath, destinationPath string, destinationPermissions fs.FileMode) error { reader, err := container.CopyFileFromContainer(ctx, sourcePath) if err != nil {