Skip to content

Commit

Permalink
Send Operator set update to rollups (#34)
Browse files Browse the repository at this point in the history
* feat:  SFFLRegistryRollup deploy script. Script for state & data dump

* feat: Added ChainInfo to deployment output + config modifications

* feat: added RollupBroadcaster

* Update integration tests

* feat: applied changes to integration tests

* fix: integration test's compilation

* fix: paths

* fix: compilation

* refactor: Fix import order

* fix: Fix locking in message aggregation service

* fix: Fix message aggregation only handling one signed message

* fix: Fix attestor setup unsafe unordered loop operations

* fix: Fix bigint comparison

* test: Use deployed rollup anvil instances instead of predeployed

* Modify RollupDeployer

* fix: unit tests. Added mock of RollupBroadcaster

* fix: keys read from configs.

* refactor: renamings according to style guide + some minor edits

* fix: removed unnecessary arguments. log message fix. verifyCalldata zero check fix

* fix: for now skip on different operator update id

* fix: post rebase issues

* test: Increase integration test duration

* refactor: pr comments

* fix: change to localhost

* fix: compilation

---------

Co-authored-by: Franco Barpp Gomes <[email protected]>
  • Loading branch information
taco-paco and Hyodar authored Mar 11, 2024
1 parent 4006db1 commit 0a7e664
Show file tree
Hide file tree
Showing 19 changed files with 473 additions and 132 deletions.
26 changes: 22 additions & 4 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Aggregator struct {
restServerIpPortAddr string
avsWriter chainio.AvsWriterer
avsReader chainio.AvsReaderer
rollupBroadcaster RollupBroadcasterer

// aggregation related fields
taskBlsAggregationService blsagg.BlsAggregationService
stateRootUpdateBlsAggregationService MessageBlsAggregationService
Expand All @@ -99,7 +101,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
return nil, err
}

avsReader, err := chainio.BuildAvsReader(config.SFFLRegistryCoordinatorAddr, config.OperatorStateRetrieverAddr, ethHttpClient, logger)
avsReader, err := chainio.BuildAvsReaderFromConfig(config, ethHttpClient, logger)
if err != nil {
logger.Error("Cannot create avsReader", "err", err)
return nil, err
Expand All @@ -111,7 +113,8 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
return nil, err
}

signerV2, _, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: config.EcdsaPrivateKey}, chainId)
signerConfig := signerv2.Config{PrivateKey: config.EcdsaPrivateKey}
signerV2, _, err := signerv2.SignerFromConfig(signerConfig, chainId)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -143,6 +146,12 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
return nil, err
}

rollupBroadcaster, err := NewRollupBroadcaster(ctx, config.RollupsInfo, signerConfig, config.AggregatorAddress, logger)
if err != nil {
logger.Error("Cannot create rollup broadcaster", "err", err)
return nil, err
}

operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)
Expand All @@ -155,6 +164,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
restServerIpPortAddr: config.AggregatorRestServerIpPortAddr,
avsWriter: avsWriter,
avsReader: avsReader,
rollupBroadcaster: rollupBroadcaster,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService,
Expand Down Expand Up @@ -186,6 +196,7 @@ func (agg *Aggregator) Start(ctx context.Context) error {
// TODO: make this based on the actual timestamps
timestamp := uint64(0)

broadcasterErrorChan := agg.rollupBroadcaster.GetErrorChan()
for {
select {
case <-ctx.Done():
Expand All @@ -198,14 +209,18 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.handleStateRootUpdateReachedQuorum(blsAggServiceResp)
case blsAggServiceResp := <-agg.operatorSetUpdateBlsAggregationService.GetResponseChannel():
agg.logger.Info("Received response from operatorSetUpdateBlsAggregationService", "blsAggServiceResp", blsAggServiceResp)
agg.handleOperatorSetUpdateReachedQuorum(blsAggServiceResp)
agg.handleOperatorSetUpdateReachedQuorum(ctx, blsAggServiceResp)
case <-ticker.C:
err := agg.sendNewCheckpointTask(timestamp, timestamp)
timestamp++
if err != nil {
// we log the errors inside sendNewCheckpointTask() so here we just continue to the next task
continue
}

case err := <-broadcasterErrorChan:
// TODO: proper error handling in all class
agg.logger.Error("Received error from broadcaster", "err", err)
}
}
}
Expand Down Expand Up @@ -319,7 +334,7 @@ func (agg *Aggregator) handleStateRootUpdateReachedQuorum(blsAggServiceResp type

}

func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(blsAggServiceResp types.MessageBlsAggregationServiceResponse) {
func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(ctx context.Context, blsAggServiceResp types.MessageBlsAggregationServiceResponse) {
agg.operatorSetUpdatesLock.RLock()
msg, ok := agg.operatorSetUpdates[blsAggServiceResp.MessageDigest]
agg.operatorSetUpdatesLock.RUnlock()
Expand All @@ -340,6 +355,9 @@ func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(blsAggServiceResp ty
return
}

signatureInfo := core.FormatBlsAggregationRollup(&blsAggServiceResp)
agg.rollupBroadcaster.BroadcastOperatorSetUpdate(ctx, msg, signatureInfo)

err := agg.msgDb.StoreOperatorSetUpdate(msg)
if err != nil {
agg.logger.Error("Aggregator could not store message")
Expand Down
22 changes: 15 additions & 7 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestSendNewTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, mockAvsWriterer, mockTaskBlsAggService, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
Expand All @@ -82,7 +82,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockMsgDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := servicemanager.StateRootUpdateMessage{}
Expand All @@ -109,39 +109,46 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := registryrollup.OperatorSetUpdateMessage{}
msgDigest, err := core.GetOperatorSetUpdateMessageDigest(&msg)
assert.Nil(t, err)

blsAggServiceResp := types.MessageBlsAggregationServiceResponse{
MessageDigest: msgDigest,
MessageDigest: msgDigest,
NonSignersPubkeysG1: make([]*bls.G1Point, 0),
SignersApkG2: bls.NewZeroG2Point(),
SignersAggSigG1: bls.NewZeroSignature(),
}

aggregator.operatorSetUpdates[msgDigest] = msg

mockMsgDb.EXPECT().StoreOperatorSetUpdate(msg)
mockMsgDb.EXPECT().StoreOperatorSetUpdateAggregation(msg, blsAggServiceResp)

signatureInfo := core.FormatBlsAggregationRollup(&blsAggServiceResp)
mockRollupBroadcaster.EXPECT().BroadcastOperatorSetUpdate(context.Background(), msg, signatureInfo)

assert.Contains(t, aggregator.operatorSetUpdates, msgDigest)

aggregator.handleOperatorSetUpdateReachedQuorum(blsAggServiceResp)
aggregator.handleOperatorSetUpdateReachedQuorum(context.Background(), blsAggServiceResp)

assert.NotContains(t, aggregator.operatorSetUpdates, msgDigest)
}

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[bls.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageDatabaser, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageBlsAggregationService, *mocks.MockMessageDatabaser, *mocks.MockRollupBroadcasterer, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
mockTaskBlsAggregationService := blsaggservmock.NewMockBlsAggregationService(mockCtrl)
mockStateRootUpdateBlsAggregationService := mocks.NewMockMessageBlsAggregationService(mockCtrl)
mockOperatorSetUpdateBlsAggregationService := mocks.NewMockMessageBlsAggregationService(mockCtrl)
mockMsgDb := mocks.NewMockMessageDatabaser(mockCtrl)
mockRollupBroadcaster := mocks.NewMockRollupBroadcasterer(mockCtrl)

aggregator := &Aggregator{
logger: logger,
Expand All @@ -155,8 +162,9 @@ func createMockAggregator(
taskResponses: make(map[coretypes.TaskIndex]map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse),
stateRootUpdates: make(map[coretypes.MessageDigest]servicemanager.StateRootUpdateMessage),
operatorSetUpdates: make(map[coretypes.MessageDigest]registryrollup.OperatorSetUpdateMessage),
rollupBroadcaster: mockRollupBroadcaster,
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, nil
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, nil
}

// just a mock ethclient to pass to bindings
Expand Down
1 change: 1 addition & 0 deletions aggregator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package aggregator

//go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService
//go:generate mockgen -destination=./mocks/message_database.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageDatabaser
//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer
27 changes: 17 additions & 10 deletions aggregator/message_blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ func (mbas *MessageBlsAggregatorService) InitializeMessageIfNotExists(
timeToExpiry time.Duration,
ethBlockNumber uint64,
) error {
mbas.messageChansLock.Lock()
defer mbas.messageChansLock.Unlock()

if _, taskExists := mbas.signedMessageDigestsCs[messageDigest]; taskExists {
return nil
}

signedMessageDigestsC := make(chan SignedMessageDigest)
mbas.messageChansLock.Lock()
mbas.signedMessageDigestsCs[messageDigest] = signedMessageDigestsC
mbas.messageChansLock.Unlock()
go mbas.singleMessageAggregatorGoroutineFunc(messageDigest, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedMessageDigestsC, ethBlockNumber)

return nil
}

Expand All @@ -133,9 +135,10 @@ func (mbas *MessageBlsAggregatorService) ProcessNewSignature(
blsSignature *bls.Signature,
operatorId bls.OperatorId,
) error {
mbas.messageChansLock.Lock()
mbas.messageChansLock.RLock()
messageC, taskInitialized := mbas.signedMessageDigestsCs[messageDigest]
mbas.messageChansLock.Unlock()
mbas.messageChansLock.RUnlock()

if !taskInitialized {
return MessageNotFoundErrorFn(messageDigest)
}
Expand Down Expand Up @@ -171,8 +174,10 @@ func (mbas *MessageBlsAggregatorService) singleMessageAggregatorGoroutineFunc(
select {
case signedMessageDigest := <-signedMessageDigestsC:
mbas.logger.Debug("Message goroutine received new signed message digest", "messageDigest", messageDigest)
mbas.handleSignedMessageDigest(signedMessageDigest, validationInfo)
return

if mbas.handleSignedMessageDigest(signedMessageDigest, validationInfo) {
return
}
case <-messageExpiredTimer.C:
mbas.aggregatedResponsesC <- aggtypes.MessageBlsAggregationServiceResponse{
Err: MessageExpiredError,
Expand Down Expand Up @@ -230,12 +235,12 @@ func (mbas *MessageBlsAggregatorService) fetchValidationInfo(quorumNumbers []typ
}
}

func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessageDigest SignedMessageDigest, validationInfo signedMessageDigestValidationInfo) {
func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessageDigest SignedMessageDigest, validationInfo signedMessageDigestValidationInfo) bool {
err := mbas.verifySignature(signedMessageDigest, validationInfo.operatorsAvsStateDict)
signedMessageDigest.SignatureVerificationErrorC <- err

if err != nil {
return
return false
}

digestAggregatedOperators, ok := validationInfo.aggregatedOperatorsDict[signedMessageDigest.MessageDigest]
Expand Down Expand Up @@ -263,7 +268,7 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage
validationInfo.aggregatedOperatorsDict[signedMessageDigest.MessageDigest] = digestAggregatedOperators

if !checkIfStakeThresholdsMet(digestAggregatedOperators.signersTotalStakePerQuorum, validationInfo.totalStakePerQuorum, validationInfo.quorumThresholdPercentagesMap) {
return
return false
}

nonSignersOperatorIds := []types.OperatorId{}
Expand All @@ -279,7 +284,7 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage
mbas.aggregatedResponsesC <- aggtypes.MessageBlsAggregationServiceResponse{
Err: err,
}
return
return false
}

messageBlsAggregationServiceResponse := aggtypes.MessageBlsAggregationServiceResponse{
Expand All @@ -297,6 +302,8 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage
}

mbas.aggregatedResponsesC <- messageBlsAggregationServiceResponse

return true
}

func (mbas *MessageBlsAggregatorService) closeMessageGoroutine(messageDigest coretypes.MessageDigest) {
Expand Down
66 changes: 66 additions & 0 deletions aggregator/mocks/rollup_broadcaster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aggregator/rest_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGetStateRootUpdateAggregation(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

go aggregator.startRestServer()
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestGetOperatorSetUpdateAggregation(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockDb, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

go aggregator.startRestServer()
Expand Down
Loading

0 comments on commit 0a7e664

Please sign in to comment.