From 880c5465f5f5bf66b9c89089196325058f63fde7 Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 28 May 2024 18:46:45 +0700 Subject: [PATCH 01/12] feat: custom pubkeys inmemory service --- aggregator/aggregator.go | 7 +- aggregator/operator_registrations_inmemory.go | 190 ++++++++++++++++++ aggregator/rpc_server.go | 62 +++++- 3 files changed, 248 insertions(+), 11 deletions(-) create mode 100644 aggregator/operator_registrations_inmemory.go diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index e865e0f7..598c3fac 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -14,7 +14,6 @@ import ( "github.com/Layr-Labs/eigensdk-go/metrics" "github.com/Layr-Labs/eigensdk-go/services/avsregistry" blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation" - oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys" "github.com/Layr-Labs/eigensdk-go/signerv2" eigentypes "github.com/Layr-Labs/eigensdk-go/types" "github.com/prometheus/client_golang/prometheus" @@ -91,6 +90,7 @@ type Aggregator struct { restListener RestEventListener aggregatorListener AggregatorEventListener + operatorRegistrationsService OperatorRegistrationsService taskBlsAggregationService blsagg.BlsAggregationService stateRootUpdateBlsAggregationService MessageBlsAggregationService operatorSetUpdateBlsAggregationService MessageBlsAggregationService @@ -200,8 +200,8 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } - operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger) - avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger) + operatorRegistrationsService := NewOperatorRegistrationsServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger) + avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorRegistrationsService, logger) taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger) stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger) operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger) @@ -217,6 +217,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo rollupBroadcaster: rollupBroadcaster, httpClient: ethHttpClient, wsClient: ethWsClient, + operatorRegistrationsService: operatorRegistrationsService, taskBlsAggregationService: taskBlsAggregationService, stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService, operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService, diff --git a/aggregator/operator_registrations_inmemory.go b/aggregator/operator_registrations_inmemory.go new file mode 100644 index 00000000..799e0f37 --- /dev/null +++ b/aggregator/operator_registrations_inmemory.go @@ -0,0 +1,190 @@ +package aggregator + +import ( + "context" + "sync" + + "github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry" + "github.com/Layr-Labs/eigensdk-go/crypto/bls" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys" + "github.com/Layr-Labs/eigensdk-go/types" + "github.com/ethereum/go-ethereum/common" +) + +type OperatorRegistrationsService interface { + operatorpubkeys.OperatorPubkeysService + + GetOperatorPubkeysById(ctx context.Context, operatorId types.OperatorId) (operatorPubkeys types.OperatorPubkeys, operatorFound bool) +} + +type OperatorRegistrationsServiceInMemory struct { + avsRegistrySubscriber avsregistry.AvsRegistrySubscriber + avsRegistryReader avsregistry.AvsRegistryReader + logger logging.Logger + queryByAddrC chan<- queryByAddr + queryByIdC chan<- queryById +} + +type queryByAddr struct { + operatorAddr common.Address + // channel through which to receive the response (operator pubkeys) + respC chan<- resp +} +type queryById struct { + operatorId types.OperatorId + // channel through which to receive the response (operator pubkeys) + respC chan<- resp +} + +type resp struct { + operatorPubkeys types.OperatorPubkeys + // false if operators were not present in the pubkey dict + operatorExists bool +} + +var _ operatorpubkeys.OperatorPubkeysService = (*OperatorRegistrationsServiceInMemory)(nil) + +// NewOperatorRegistrationsServiceInMemory constructs a OperatorRegistrationsServiceInMemory and starts it in a goroutine. +// It takes a context as argument because the "backfilling" of the database is done inside this constructor, +// so we wait for all past NewPubkeyRegistration events to be queried and the db to be filled before returning the service. +// The constructor is thus following a RAII-like pattern, of initializing the serving during construction. +// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving properly. +func NewOperatorRegistrationsServiceInMemory( + ctx context.Context, + avsRegistrySubscriber avsregistry.AvsRegistrySubscriber, + avsRegistryReader avsregistry.AvsRegistryReader, + logger logging.Logger, +) *OperatorRegistrationsServiceInMemory { + queryByAddrC := make(chan queryByAddr) + queryByIdC := make(chan queryById) + + pkcs := &OperatorRegistrationsServiceInMemory{ + avsRegistrySubscriber: avsRegistrySubscriber, + avsRegistryReader: avsRegistryReader, + logger: logger, + queryByAddrC: queryByAddrC, + queryByIdC: queryByIdC, + } + + // We use this waitgroup to wait on the initialization of the inmemory pubkey dict, + // which requires querying the past events of the pubkey registration contract + wg := sync.WaitGroup{} + wg.Add(1) + pkcs.startServiceInGoroutine(ctx, queryByAddrC, queryByIdC, &wg) + wg.Wait() + + return pkcs +} + +func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx context.Context, queryByAddrC <-chan queryByAddr, queryByIdC <-chan queryById, wg *sync.WaitGroup) { + go func() { + pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys) + pubkeyByIdDict := make(map[types.OperatorId]types.OperatorPubkeys) + + ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorPubkeysServiceInMemory") + newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + ors.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + // see the warning above the struct definition to understand why we panic here + panic(err) + } + + ors.queryPastRegisteredOperatorEventsAndFillDb(ctx, pubkeyByAddrDict, pubkeyByIdDict) + + // The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry + // before the block at which we started the ws subscription above + wg.Done() + + for { + select { + case <-ctx.Done(): + ors.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + return + + case err := <-newPubkeyRegistrationSub.Err(): + ors.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory") + newPubkeyRegistrationSub.Unsubscribe() + newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + // see the warning above the struct definition to understand why we panic here + panic(err) + } + + case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: + pubkeys := types.OperatorPubkeys{ + G1Pubkey: bls.NewG1Point(newPubkeyRegistrationEvent.PubkeyG1.X, newPubkeyRegistrationEvent.PubkeyG1.Y), + G2Pubkey: bls.NewG2Point(newPubkeyRegistrationEvent.PubkeyG2.X, newPubkeyRegistrationEvent.PubkeyG2.Y), + } + operatorId := types.OperatorIdFromPubkey(pubkeys.G1Pubkey) + operatorAddr := newPubkeyRegistrationEvent.Operator + + pubkeyByAddrDict[operatorAddr] = pubkeys + pubkeyByIdDict[operatorId] = pubkeys + + ors.logger.Debug("Added operator pubkeys to pubkey dict", + "service", "OperatorPubkeysServiceInMemory", + "block", newPubkeyRegistrationEvent.Raw.BlockNumber, + "operatorAddr", operatorAddr, + "operatorId", operatorId, + "G1pubkey", pubkeyByAddrDict[operatorAddr].G1Pubkey, + "G2pubkey", pubkeyByAddrDict[operatorAddr].G2Pubkey, + ) + + // Receive a queryByAddr from GetOperatorPubkeys + case operatorPubkeyQuery := <-queryByAddrC: + pubkeys, ok := pubkeyByAddrDict[operatorPubkeyQuery.operatorAddr] + operatorPubkeyQuery.respC <- resp{pubkeys, ok} + + // Receive a queryById from GetOperatorPubkeysById + case operatorPubkeyQuery := <-queryByIdC: + pubkeys, ok := pubkeyByIdDict[operatorPubkeyQuery.operatorId] + operatorPubkeyQuery.respC <- resp{pubkeys, ok} + } + } + }() +} + +func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context, pubkeyByAddrDict map[common.Address]types.OperatorPubkeys, pubkeyByIdDict map[types.OperatorId]types.OperatorPubkeys) { + // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we queryByAddr some events that we will receive again in the websocket, + // since we will just overwrite the pubkey dict with the same values. + alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ors.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil) + if err != nil { + ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorPubkeysServiceInMemory") + panic(err) + } + + ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorPubkeysServiceInMemory") + for i, operatorAddr := range alreadyRegisteredOperatorAddrs { + operatorPubkeys := alreadyRegisteredOperatorPubkeys[i] + pubkeyByAddrDict[operatorAddr] = operatorPubkeys + + operatorId := types.OperatorIdFromPubkey(operatorPubkeys.G1Pubkey) + pubkeyByIdDict[operatorId] = operatorPubkeys + } +} + +func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeys(ctx context.Context, operator common.Address) (types.OperatorPubkeys, bool) { + respC := make(chan resp) + ors.queryByAddrC <- queryByAddr{operator, respC} + + select { + case <-ctx.Done(): + return types.OperatorPubkeys{}, false + case resp := <-respC: + return resp.operatorPubkeys, resp.operatorExists + } +} + +func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeysById(ctx context.Context, operatorId types.OperatorId) (types.OperatorPubkeys, bool) { + respC := make(chan resp) + ors.queryByIdC <- queryById{operatorId, respC} + + select { + case <-ctx.Done(): + return types.OperatorPubkeys{}, false + case resp := <-respC: + return resp.operatorPubkeys, resp.operatorExists + } +} diff --git a/aggregator/rpc_server.go b/aggregator/rpc_server.go index 9c2f4963..ed9f905b 100644 --- a/aggregator/rpc_server.go +++ b/aggregator/rpc_server.go @@ -8,6 +8,7 @@ import ( "net/rpc" "strings" + "github.com/Layr-Labs/eigensdk-go/crypto/bls" eigentypes "github.com/Layr-Labs/eigensdk-go/types" "github.com/NethermindEth/near-sffl/aggregator/types" @@ -40,25 +41,50 @@ func (agg *Aggregator) startServer() error { return nil } +func (agg *Aggregator) verifyOperatorIdRegistered(operatorId eigentypes.OperatorId, messageDigest [32]byte, signature bls.Signature) (bool, error) { + operatorPubkeys, ok := agg.operatorRegistrationsService.GetOperatorPubkeysById(context.Background(), operatorId) + if !ok { + return false, OperatorNotPartOfTaskQuorum400 + } + + ok, err := signature.Verify(operatorPubkeys.G2Pubkey, messageDigest) + if err != nil { + return false, UnknownErrorWhileVerifyingSignature400 + } + + return ok, nil +} + // rpc endpoint which is called by operator // reply doesn't need to be checked. If there are no errors, the task response is accepted // rpc framework forces a reply type to exist, so we put bool as a placeholder func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse, reply *bool) error { agg.logger.Info("Received signed task response", "response", fmt.Sprintf("%#v", signedCheckpointTaskResponse)) - - taskIndex := signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex taskResponseDigest, err := signedCheckpointTaskResponse.TaskResponse.Digest() if err != nil { agg.logger.Error("Failed to get task response digest", "err", err) return TaskResponseDigestNotFoundError500 } + operatorId := signedCheckpointTaskResponse.OperatorId + signature := signedCheckpointTaskResponse.BlsSignature + ok, err := agg.verifyOperatorIdRegistered(operatorId, taskResponseDigest, signature) + if err != nil { + agg.logger.Error("Failed to verify message", "err", err) + return err + } + if !ok { + agg.logger.Info("Invalid operator signature", "err", err) + return SignatureVerificationFailed400 + } + agg.rpcListener.IncTotalSignedCheckpointTaskResponse() agg.rpcListener.ObserveLastMessageReceivedTime(signedCheckpointTaskResponse.OperatorId, CheckpointTaskResponseLabel) + taskIndex := signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex err = agg.taskBlsAggregationService.ProcessNewSignature( context.Background(), taskIndex, taskResponseDigest, - &signedCheckpointTaskResponse.BlsSignature, signedCheckpointTaskResponse.OperatorId, + &signature, signedCheckpointTaskResponse.OperatorId, ) if err != nil { agg.rpcListener.IncSignedCheckpointTaskResponse( @@ -78,7 +104,7 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR } agg.taskResponsesLock.Unlock() - agg.rpcListener.IncSignedCheckpointTaskResponse(signedCheckpointTaskResponse.OperatorId, false, false) + agg.rpcListener.IncSignedCheckpointTaskResponse(operatorId, false, false) return nil } @@ -90,15 +116,25 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate return TaskResponseDigestNotFoundError500 } - hasNearDaCommitment := signedStateRootUpdateMessage.Message.HasNearDaCommitment() operatorId := signedStateRootUpdateMessage.OperatorId - rollupId := signedStateRootUpdateMessage.Message.RollupId + signature := signedStateRootUpdateMessage.BlsSignature + ok, err := agg.verifyOperatorIdRegistered(operatorId, messageDigest, signature) + if err != nil { + agg.logger.Error("Failed to verify message", "err", err) + return err + } + if !ok { + agg.logger.Info("Invalid operator signature", "err", err) + return SignatureVerificationFailed400 + } agg.logger.Info("Received signed state root update message", "updateMessage", fmt.Sprintf("%#v", signedStateRootUpdateMessage) , "messageDigest", fmt.Sprintf("%#v", messageDigest)) agg.rpcListener.IncTotalSignedCheckpointTaskResponse() agg.rpcListener.ObserveLastMessageReceivedTime(operatorId, StateRootUpdateMessageLabel) + rollupId := signedStateRootUpdateMessage.Message.RollupId + hasNearDaCommitment := signedStateRootUpdateMessage.Message.HasNearDaCommitment() err = agg.stateRootUpdateBlsAggregationService.InitializeMessageIfNotExists( messageDigest, coretypes.QUORUM_NUMBERS, @@ -118,7 +154,7 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate err = agg.stateRootUpdateBlsAggregationService.ProcessNewSignature( context.Background(), messageDigest, - &signedStateRootUpdateMessage.BlsSignature, signedStateRootUpdateMessage.OperatorId, + &signature, signedStateRootUpdateMessage.OperatorId, ) if err != nil { agg.rpcListener.IncSignedStateRootUpdateMessage(operatorId, rollupId, true, hasNearDaCommitment) @@ -138,6 +174,16 @@ func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUp } operatorId := signedOperatorSetUpdateMessage.OperatorId + signature := signedOperatorSetUpdateMessage.BlsSignature + ok, err := agg.verifyOperatorIdRegistered(operatorId, messageDigest, signature) + if err != nil { + agg.logger.Error("Failed to verify message", "err", err) + return err + } + if !ok { + agg.logger.Info("Invalid operator signature", "err", err) + return SignatureVerificationFailed400 + } agg.logger.Info("Received signed operator set update message", "message", fmt.Sprintf("%#v", signedOperatorSetUpdateMessage)) @@ -171,7 +217,7 @@ func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUp err = agg.operatorSetUpdateBlsAggregationService.ProcessNewSignature( context.Background(), messageDigest, - &signedOperatorSetUpdateMessage.BlsSignature, signedOperatorSetUpdateMessage.OperatorId, + &signature, signedOperatorSetUpdateMessage.OperatorId, ) if err != nil { agg.rpcListener.IncSignedOperatorSetUpdateMessage(operatorId, true) From 05bbd37f1e81e800a4de8c4372755f04b77e9e00 Mon Sep 17 00:00:00 2001 From: edwin Date: Wed, 29 May 2024 16:02:40 +0700 Subject: [PATCH 02/12] fix: tests --- aggregator/aggregator_test.go | 23 +++--- aggregator/gen.go | 1 + .../mocks/operator_registrations_inmemory.go | 71 +++++++++++++++++++ aggregator/rest_server_test.go | 6 +- aggregator/rpc_server_test.go | 51 +++++++++++-- 5 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 aggregator/mocks/operator_registrations_inmemory.go diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index 0d9d9cf2..12676b05 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -33,13 +33,14 @@ var MOCK_OPERATOR_BLS_PRIVATE_KEY, _ = bls.NewPrivateKey(MOCK_OPERATOR_BLS_PRIVA var MOCK_OPERATOR_KEYPAIR = bls.NewKeyPair(MOCK_OPERATOR_BLS_PRIVATE_KEY) var MOCK_OPERATOR_G1PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG1() var MOCK_OPERATOR_G2PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG2() +var MOCK_OPERATOR_PUBKEYS = eigentypes.OperatorPubkeys{ + G1Pubkey: MOCK_OPERATOR_G1PUBKEY, + G2Pubkey: MOCK_OPERATOR_G2PUBKEY, +} var MOCK_OPERATOR_PUBKEY_DICT = map[eigentypes.OperatorId]types.OperatorInfo{ MOCK_OPERATOR_ID: { - OperatorPubkeys: eigentypes.OperatorPubkeys{ - G1Pubkey: MOCK_OPERATOR_G1PUBKEY, - G2Pubkey: MOCK_OPERATOR_G2PUBKEY, - }, - OperatorAddr: common.Address{}, + OperatorPubkeys: MOCK_OPERATOR_PUBKEYS, + OperatorAddr: common.Address{}, }, } @@ -54,7 +55,7 @@ func TestSendNewTask(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) var TASK_INDEX = uint32(0) @@ -87,7 +88,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) msg := messages.StateRootUpdateMessage{} @@ -117,7 +118,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) msg := messages.OperatorSetUpdateMessage{} @@ -151,7 +152,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) { func createMockAggregator( mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo, -) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) { +) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) { logger := sdklogging.NewNoopLogger() mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl) mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl) @@ -161,6 +162,7 @@ func createMockAggregator( mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl) mockRollupBroadcaster := aggmocks.NewMockRollupBroadcasterer(mockCtrl) mockClient := safeclientmocks.NewMockSafeClient(mockCtrl) + mockOperatorRegistrationsService := aggmocks.NewMockOperatorRegistrationsService(mockCtrl) aggregator := &Aggregator{ logger: logger, @@ -169,6 +171,7 @@ func createMockAggregator( taskBlsAggregationService: mockTaskBlsAggregationService, stateRootUpdateBlsAggregationService: mockStateRootUpdateBlsAggregationService, operatorSetUpdateBlsAggregationService: mockOperatorSetUpdateBlsAggregationService, + operatorRegistrationsService: mockOperatorRegistrationsService, msgDb: mockMsgDb, tasks: make(map[coretypes.TaskIndex]taskmanager.CheckpointTask), taskResponses: make(map[coretypes.TaskIndex]map[eigentypes.TaskResponseDigest]messages.CheckpointTaskResponse), @@ -181,5 +184,5 @@ func createMockAggregator( restListener: &SelectiveRestListener{}, aggregatorListener: &SelectiveAggregatorListener{}, } - return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil + return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockOperatorRegistrationsService, mockMsgDb, mockRollupBroadcaster, mockClient, nil } diff --git a/aggregator/gen.go b/aggregator/gen.go index c6efd554..24bc3fc5 100644 --- a/aggregator/gen.go +++ b/aggregator/gen.go @@ -2,4 +2,5 @@ package aggregator //go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService //go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer +//go:generate mockgen -destination=./mocks/operator_registrations_inmemory.go -package=mocks github.com/NethermindEth/near-sffl/aggregator OperatorRegistrationsService //go:generate mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client diff --git a/aggregator/mocks/operator_registrations_inmemory.go b/aggregator/mocks/operator_registrations_inmemory.go new file mode 100644 index 00000000..ed7642c3 --- /dev/null +++ b/aggregator/mocks/operator_registrations_inmemory.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/NethermindEth/near-sffl/aggregator (interfaces: OperatorRegistrationsService) +// +// Generated by this command: +// +// mockgen -destination=./mocks/operator_registrations_inmemory.go -package=mocks github.com/NethermindEth/near-sffl/aggregator OperatorRegistrationsService +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/Layr-Labs/eigensdk-go/types" + common "github.com/ethereum/go-ethereum/common" + gomock "go.uber.org/mock/gomock" +) + +// MockOperatorRegistrationsService is a mock of OperatorRegistrationsService interface. +type MockOperatorRegistrationsService struct { + ctrl *gomock.Controller + recorder *MockOperatorRegistrationsServiceMockRecorder +} + +// MockOperatorRegistrationsServiceMockRecorder is the mock recorder for MockOperatorRegistrationsService. +type MockOperatorRegistrationsServiceMockRecorder struct { + mock *MockOperatorRegistrationsService +} + +// NewMockOperatorRegistrationsService creates a new mock instance. +func NewMockOperatorRegistrationsService(ctrl *gomock.Controller) *MockOperatorRegistrationsService { + mock := &MockOperatorRegistrationsService{ctrl: ctrl} + mock.recorder = &MockOperatorRegistrationsServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOperatorRegistrationsService) EXPECT() *MockOperatorRegistrationsServiceMockRecorder { + return m.recorder +} + +// GetOperatorPubkeys mocks base method. +func (m *MockOperatorRegistrationsService) GetOperatorPubkeys(arg0 context.Context, arg1 common.Address) (types.OperatorPubkeys, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOperatorPubkeys", arg0, arg1) + ret0, _ := ret[0].(types.OperatorPubkeys) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetOperatorPubkeys indicates an expected call of GetOperatorPubkeys. +func (mr *MockOperatorRegistrationsServiceMockRecorder) GetOperatorPubkeys(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOperatorPubkeys", reflect.TypeOf((*MockOperatorRegistrationsService)(nil).GetOperatorPubkeys), arg0, arg1) +} + +// GetOperatorPubkeysById mocks base method. +func (m *MockOperatorRegistrationsService) GetOperatorPubkeysById(arg0 context.Context, arg1 types.Bytes32) (types.OperatorPubkeys, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOperatorPubkeysById", arg0, arg1) + ret0, _ := ret[0].(types.OperatorPubkeys) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetOperatorPubkeysById indicates an expected call of GetOperatorPubkeysById. +func (mr *MockOperatorRegistrationsServiceMockRecorder) GetOperatorPubkeysById(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOperatorPubkeysById", reflect.TypeOf((*MockOperatorRegistrationsService)(nil).GetOperatorPubkeysById), arg0, arg1) +} diff --git a/aggregator/rest_server_test.go b/aggregator/rest_server_test.go index 606ce7cc..ebc95d39 100644 --- a/aggregator/rest_server_test.go +++ b/aggregator/rest_server_test.go @@ -18,7 +18,7 @@ func TestGetStateRootUpdateAggregation(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) go aggregator.startRestServer() @@ -77,7 +77,7 @@ func TestGetOperatorSetUpdateAggregation(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) go aggregator.startRestServer() @@ -130,7 +130,7 @@ func TestGetCheckpointMessages(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) go aggregator.startRestServer() diff --git a/aggregator/rpc_server_test.go b/aggregator/rpc_server_test.go index 914abf8c..d7120c68 100644 --- a/aggregator/rpc_server_test.go +++ b/aggregator/rpc_server_test.go @@ -27,7 +27,7 @@ func TestProcessSignedCheckpointTaskResponse(t *testing.T) { var FROM_NEAR_BLOCK = uint64(3) var TO_NEAR_BLOCK = uint64(4) - aggregator, _, _, mockBlsAggServ, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, mockBlsAggServ, _, _, mockOperatorRegistrationsServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) signedCheckpointTaskResponse, err := createMockSignedCheckpointTaskResponse(MockTask{ @@ -43,8 +43,11 @@ func TestProcessSignedCheckpointTaskResponse(t *testing.T) { // TODO(samlaf): is this the right way to test writing to external service? // or is there some wisdom to "don't mock 3rd party code"? // see https://hynek.me/articles/what-to-mock-in-5-mins/ - mockBlsAggServ.EXPECT().ProcessNewSignature(context.Background(), TASK_INDEX, signedCheckpointTaskResponseDigest, + ctx := context.Background() + mockBlsAggServ.EXPECT().ProcessNewSignature(ctx, TASK_INDEX, signedCheckpointTaskResponseDigest, &signedCheckpointTaskResponse.BlsSignature, signedCheckpointTaskResponse.OperatorId) + mockOperatorRegistrationsServ.EXPECT().GetOperatorPubkeysById(ctx, signedCheckpointTaskResponse.OperatorId).Return(MOCK_OPERATOR_PUBKEYS, true) + err = aggregator.ProcessSignedCheckpointTaskResponse(signedCheckpointTaskResponse, nil) assert.Nil(t, err) } @@ -53,7 +56,7 @@ func TestProcessSignedStateRootUpdateMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, mockMessageBlsAggServ, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, mockMessageBlsAggServ, _, mockOperatorRegistrationsServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) message := messages.StateRootUpdateMessage{ @@ -73,15 +76,42 @@ func TestProcessSignedStateRootUpdateMessage(t *testing.T) { mockMessageBlsAggServ.EXPECT().ProcessNewSignature(context.Background(), messageDigest, &signedMessage.BlsSignature, signedMessage.OperatorId) mockMessageBlsAggServ.EXPECT().InitializeMessageIfNotExists(messageDigest, coretypes.QUORUM_NUMBERS, []eigentypes.QuorumThresholdPercentage{types.MESSAGE_AGGREGATION_QUORUM_THRESHOLD}, types.MESSAGE_TTL, types.MESSAGE_BLS_AGGREGATION_TIMEOUT, uint64(0)) + mockOperatorRegistrationsServ.EXPECT().GetOperatorPubkeysById(context.Background(), signedMessage.OperatorId).Return(MOCK_OPERATOR_PUBKEYS, true) + err = aggregator.ProcessSignedStateRootUpdateMessage(signedMessage, nil) assert.Nil(t, err) } +func TestProcessInvalidSignedStateRootUpdateMessage(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + aggregator, _, _, _, _, _, mockOperatorRegistrationsServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + assert.Nil(t, err) + + message := messages.StateRootUpdateMessage{ + RollupId: 1, + BlockHeight: 2, + Timestamp: 3, + NearDaCommitment: keccak256(4), + NearDaTransactionId: keccak256(5), + StateRoot: keccak256(6), + } + + signedMessage, err := createMockSignedStateRootUpdateMessage(message, *MOCK_OPERATOR_KEYPAIR) + assert.Nil(t, err) + invalidateSignature(&signedMessage.BlsSignature) + + mockOperatorRegistrationsServ.EXPECT().GetOperatorPubkeysById(context.Background(), signedMessage.OperatorId).Return(MOCK_OPERATOR_PUBKEYS, true) + err = aggregator.ProcessSignedStateRootUpdateMessage(signedMessage, nil) + assert.Equal(t, err.Error(), "400. Signature verification failed") +} + func TestProcessOperatorSetUpdateMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, mockAvsReader, _, _, _, mockMessageBlsAggServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, mockAvsReader, _, _, _, mockMessageBlsAggServ, mockOperatorRegistrationsServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) message := messages.OperatorSetUpdateMessage{ @@ -97,11 +127,14 @@ func TestProcessOperatorSetUpdateMessage(t *testing.T) { messageDigest, err := signedMessage.Message.Digest() assert.Nil(t, err) - mockAvsReader.EXPECT().GetOperatorSetUpdateBlock(context.Background(), uint64(1)).Return(uint32(10), nil) + ctx := context.Background() + mockAvsReader.EXPECT().GetOperatorSetUpdateBlock(ctx, uint64(1)).Return(uint32(10), nil) - mockMessageBlsAggServ.EXPECT().ProcessNewSignature(context.Background(), messageDigest, + mockMessageBlsAggServ.EXPECT().ProcessNewSignature(ctx, messageDigest, &signedMessage.BlsSignature, signedMessage.OperatorId) mockMessageBlsAggServ.EXPECT().InitializeMessageIfNotExists(messageDigest, coretypes.QUORUM_NUMBERS, []eigentypes.QuorumThresholdPercentage{types.MESSAGE_AGGREGATION_QUORUM_THRESHOLD}, types.MESSAGE_TTL, types.MESSAGE_BLS_AGGREGATION_TIMEOUT, uint64(9)) + mockOperatorRegistrationsServ.EXPECT().GetOperatorPubkeysById(ctx, signedMessage.OperatorId).Return(MOCK_OPERATOR_PUBKEYS, true) + err = aggregator.ProcessSignedOperatorSetUpdateMessage(signedMessage, nil) assert.Nil(t, err) } @@ -110,7 +143,7 @@ func TestGetAggregatedCheckpointMessages(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, mockDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) var checkpointMessages messages.CheckpointMessages @@ -175,3 +208,7 @@ func createMockSignedOperatorSetUpdateMessage(mockMessage messages.OperatorSetUp } return signedOperatorSetUpdateMessage, nil } + +func invalidateSignature(signature *bls.Signature) { + signature.G1Affine.Neg(signature.G1Affine) +} From 0700200d7a9b86c9156d96fffd4df492b5f54bc7 Mon Sep 17 00:00:00 2001 From: edwin Date: Wed, 29 May 2024 16:12:30 +0700 Subject: [PATCH 03/12] refactor: log update --- aggregator/operator_registrations_inmemory.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/aggregator/operator_registrations_inmemory.go b/aggregator/operator_registrations_inmemory.go index 799e0f37..5b39be50 100644 --- a/aggregator/operator_registrations_inmemory.go +++ b/aggregator/operator_registrations_inmemory.go @@ -82,10 +82,10 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys) pubkeyByIdDict := make(map[types.OperatorId]types.OperatorPubkeys) - ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorPubkeysServiceInMemory") + ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorRegistrationsServiceInMemory") newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { - ors.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ors.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory") // see the warning above the struct definition to understand why we panic here panic(err) } @@ -99,15 +99,15 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con for { select { case <-ctx.Done(): - ors.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + ors.logger.Infof("OperatorRegistrationsServiceInMemory: Context cancelled, exiting") return case err := <-newPubkeyRegistrationSub.Err(): - ors.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory") + ors.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorRegistrationsServiceInMemory") newPubkeyRegistrationSub.Unsubscribe() newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { - ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory") + ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory") // see the warning above the struct definition to understand why we panic here panic(err) } @@ -124,7 +124,7 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con pubkeyByIdDict[operatorId] = pubkeys ors.logger.Debug("Added operator pubkeys to pubkey dict", - "service", "OperatorPubkeysServiceInMemory", + "service", "OperatorRegistrationsServiceInMemory", "block", newPubkeyRegistrationEvent.Raw.BlockNumber, "operatorAddr", operatorAddr, "operatorId", operatorId, @@ -151,11 +151,11 @@ func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperatorEven // since we will just overwrite the pubkey dict with the same values. alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ors.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil) if err != nil { - ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorPubkeysServiceInMemory") + ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorRegistrationsServiceInMemory") panic(err) } - ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorPubkeysServiceInMemory") + ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorRegistrationsServiceInMemory") for i, operatorAddr := range alreadyRegisteredOperatorAddrs { operatorPubkeys := alreadyRegisteredOperatorPubkeys[i] pubkeyByAddrDict[operatorAddr] = operatorPubkeys From 33019e9ff595174a1b7492e1c49999096bb5817d Mon Sep 17 00:00:00 2001 From: edwin Date: Thu, 30 May 2024 17:00:16 +0700 Subject: [PATCH 04/12] refactor: api change in queryPastRegisteredOperators --- aggregator/operator_registrations_inmemory.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/aggregator/operator_registrations_inmemory.go b/aggregator/operator_registrations_inmemory.go index 5b39be50..7d028dd3 100644 --- a/aggregator/operator_registrations_inmemory.go +++ b/aggregator/operator_registrations_inmemory.go @@ -79,9 +79,6 @@ func NewOperatorRegistrationsServiceInMemory( func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx context.Context, queryByAddrC <-chan queryByAddr, queryByIdC <-chan queryById, wg *sync.WaitGroup) { go func() { - pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys) - pubkeyByIdDict := make(map[types.OperatorId]types.OperatorPubkeys) - ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorRegistrationsServiceInMemory") newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { @@ -90,7 +87,7 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con panic(err) } - ors.queryPastRegisteredOperatorEventsAndFillDb(ctx, pubkeyByAddrDict, pubkeyByIdDict) + pubkeyByAddrDict, pubkeyByIdDict := ors.queryPastRegisteredOperators(ctx) // The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry // before the block at which we started the ws subscription above @@ -146,7 +143,7 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con }() } -func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context, pubkeyByAddrDict map[common.Address]types.OperatorPubkeys, pubkeyByIdDict map[types.OperatorId]types.OperatorPubkeys) { +func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ctx context.Context) (map[common.Address]types.OperatorPubkeys, map[types.OperatorId]types.OperatorPubkeys) { // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we queryByAddr some events that we will receive again in the websocket, // since we will just overwrite the pubkey dict with the same values. alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ors.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil) @@ -154,8 +151,10 @@ func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperatorEven ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorRegistrationsServiceInMemory") panic(err) } - ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorRegistrationsServiceInMemory") + + pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys) + pubkeyByIdDict := make(map[types.OperatorId]types.OperatorPubkeys) for i, operatorAddr := range alreadyRegisteredOperatorAddrs { operatorPubkeys := alreadyRegisteredOperatorPubkeys[i] pubkeyByAddrDict[operatorAddr] = operatorPubkeys @@ -163,6 +162,8 @@ func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperatorEven operatorId := types.OperatorIdFromPubkey(operatorPubkeys.G1Pubkey) pubkeyByIdDict[operatorId] = operatorPubkeys } + + return pubkeyByAddrDict, pubkeyByIdDict } func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeys(ctx context.Context, operator common.Address) (types.OperatorPubkeys, bool) { From a4d1beef054338f5e07fea8a4de4cb212534d447 Mon Sep 17 00:00:00 2001 From: edwin Date: Fri, 7 Jun 2024 12:42:26 +0300 Subject: [PATCH 05/12] feat: switched to safe client use. Handle init errors --- aggregator/aggregator.go | 6 +- aggregator/operator_registrations_inmemory.go | 66 +++++++++++-------- core/chainio/avs_subscriber.go | 26 ++++++-- core/chainio/bindings.go | 14 +++- 4 files changed, 77 insertions(+), 35 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 598c3fac..6aa946c5 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -200,7 +200,11 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } - operatorRegistrationsService := NewOperatorRegistrationsServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger) + operatorRegistrationsService, err := NewOperatorRegistrationsServiceInMemory(ctx, avsSubscriber, avsReader, logger) + if err != nil { + return nil, err + } + avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorRegistrationsService, logger) taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger) stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger) diff --git a/aggregator/operator_registrations_inmemory.go b/aggregator/operator_registrations_inmemory.go index 7d028dd3..32fbbe99 100644 --- a/aggregator/operator_registrations_inmemory.go +++ b/aggregator/operator_registrations_inmemory.go @@ -55,59 +55,72 @@ func NewOperatorRegistrationsServiceInMemory( avsRegistrySubscriber avsregistry.AvsRegistrySubscriber, avsRegistryReader avsregistry.AvsRegistryReader, logger logging.Logger, -) *OperatorRegistrationsServiceInMemory { +) (*OperatorRegistrationsServiceInMemory, error) { queryByAddrC := make(chan queryByAddr) queryByIdC := make(chan queryById) - pkcs := &OperatorRegistrationsServiceInMemory{ + ors := &OperatorRegistrationsServiceInMemory{ avsRegistrySubscriber: avsRegistrySubscriber, avsRegistryReader: avsRegistryReader, logger: logger, queryByAddrC: queryByAddrC, queryByIdC: queryByIdC, } + err := ors.asyncInit(ctx, queryByAddrC, queryByIdC) + if err != nil { + return nil, err + } + + return ors, nil +} - // We use this waitgroup to wait on the initialization of the inmemory pubkey dict, - // which requires querying the past events of the pubkey registration contract +// asyncInit parks caller & schedules initialization of the inmemory pubkey dict, +// which requires querying the past events of the pubkey registration contract +func (ors *OperatorRegistrationsServiceInMemory) asyncInit(ctx context.Context, queryByAddrC chan queryByAddr, queryByIdC chan queryById) error { wg := sync.WaitGroup{} - wg.Add(1) - pkcs.startServiceInGoroutine(ctx, queryByAddrC, queryByIdC, &wg) - wg.Wait() + defer wg.Wait() + + initErrC := make(chan error) + ors.startServiceInGoroutine(ctx, queryByAddrC, queryByIdC, &wg, initErrC) - return pkcs + return <-initErrC } -func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx context.Context, queryByAddrC <-chan queryByAddr, queryByIdC <-chan queryById, wg *sync.WaitGroup) { +func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx context.Context, queryByAddrC <-chan queryByAddr, queryByIdC <-chan queryById, wg *sync.WaitGroup, initErrC chan<- error) { + wg.Add(1) + go func() { ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorRegistrationsServiceInMemory") newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() if err != nil { - ors.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory") - // see the warning above the struct definition to understand why we panic here - panic(err) + ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory") + wg.Done() + initErrC <- err + return } - pubkeyByAddrDict, pubkeyByIdDict := ors.queryPastRegisteredOperators(ctx) - + pubkeyByAddrDict, pubkeyByIdDict, err := ors.queryPastRegisteredOperators(ctx) + if err != nil { + wg.Done() + initErrC <- err + return + } // The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry // before the block at which we started the ws subscription above wg.Done() + close(initErrC) for { select { case <-ctx.Done(): - ors.logger.Infof("OperatorRegistrationsServiceInMemory: Context cancelled, exiting") + ors.logger.Info("OperatorRegistrationsServiceInMemory: Context cancelled, exiting") return + // This shall not really happen unless ctx was canceled & this came first. case err := <-newPubkeyRegistrationSub.Err(): - ors.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorRegistrationsServiceInMemory") + // Just report newPubkeyRegistrationSub.Unsubscribe() - newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() - if err != nil { - ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory") - // see the warning above the struct definition to understand why we panic here - panic(err) - } + ors.logger.Error("Error in safe client subscription", "err", err, "service", "OperatorRegistrationsServiceInMemory") case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: pubkeys := types.OperatorPubkeys{ @@ -143,14 +156,15 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con }() } -func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ctx context.Context) (map[common.Address]types.OperatorPubkeys, map[types.OperatorId]types.OperatorPubkeys) { +func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ctx context.Context) (map[common.Address]types.OperatorPubkeys, map[types.OperatorId]types.OperatorPubkeys, error) { // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we queryByAddr some events that we will receive again in the websocket, // since we will just overwrite the pubkey dict with the same values. alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ors.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil) if err != nil { - ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorRegistrationsServiceInMemory") - panic(err) + ors.logger.Error("Error querying existing registered operators", "err", err, "service", "OperatorRegistrationsServiceInMemory") + return nil, nil, err } + ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorRegistrationsServiceInMemory") pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys) @@ -163,7 +177,7 @@ func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ct pubkeyByIdDict[operatorId] = operatorPubkeys } - return pubkeyByAddrDict, pubkeyByIdDict + return pubkeyByAddrDict, pubkeyByIdDict, nil } func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeys(ctx context.Context, operator common.Address) (types.OperatorPubkeys, bool) { diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 7b3237e1..e1dd5c99 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -1,19 +1,20 @@ package chainio import ( + "github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry" + "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" + sdklogging "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/accounts/abi/bind" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" - "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" - sdklogging "github.com/Layr-Labs/eigensdk-go/logging" - opsetupdatereg "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLOperatorSetUpdateRegistry" taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager" ) type AvsSubscriberer interface { + avsregistry.AvsRegistrySubscriber SubscribeToNewTasks(checkpointTaskCreatedChan chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated) (event.Subscription, error) SubscribeToTaskResponses(taskResponseLogs chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskResponded) (event.Subscription, error) SubscribeToOperatorSetUpdates(operatorSetUpdateChan chan *opsetupdatereg.ContractSFFLOperatorSetUpdateRegistryOperatorSetUpdatedAtBlock) (event.Subscription, error) @@ -25,23 +26,34 @@ type AvsSubscriberer interface { // it takes a single url, so the bindings, even though they have watcher functions, those can't be used // with the http connection... seems very very stupid. Am I missing something? type AvsSubscriber struct { + avsregistry.AvsRegistrySubscriber AvsContractBindings *AvsManagersBindings logger sdklogging.Logger } +var _ (AvsSubscriberer) = (*AvsSubscriber)(nil) + func BuildAvsSubscriber(registryCoordinatorAddr, blsOperatorStateRetrieverAddr gethcommon.Address, ethclient eth.Client, logger sdklogging.Logger) (*AvsSubscriber, error) { avsContractBindings, err := NewAvsManagersBindings(registryCoordinatorAddr, blsOperatorStateRetrieverAddr, ethclient, logger) if err != nil { logger.Error("Failed to create contract bindings", "err", err) return nil, err } - return NewAvsSubscriber(avsContractBindings, logger), nil + + avsRegistrySubscriber, err := avsregistry.NewAvsRegistryChainSubscriber(avsContractBindings.BlsApkRegistry, logger) + if err != nil { + logger.Error("Failed to create chain registry subscriber", "err", err) + return nil, err + } + + return NewAvsSubscriber(avsContractBindings, avsRegistrySubscriber, logger), nil } -func NewAvsSubscriber(avsContractBindings *AvsManagersBindings, logger sdklogging.Logger) *AvsSubscriber { +func NewAvsSubscriber(avsContractBindings *AvsManagersBindings, avsRegistrySubscriber avsregistry.AvsRegistrySubscriber, logger sdklogging.Logger) *AvsSubscriber { return &AvsSubscriber{ - AvsContractBindings: avsContractBindings, - logger: logger, + AvsRegistrySubscriber: avsRegistrySubscriber, + AvsContractBindings: avsContractBindings, + logger: logger, } } diff --git a/core/chainio/bindings.go b/core/chainio/bindings.go index d82b3975..8d1e5cb4 100644 --- a/core/chainio/bindings.go +++ b/core/chainio/bindings.go @@ -2,8 +2,8 @@ package chainio import ( "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" + blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -19,6 +19,7 @@ type AvsManagersBindings struct { OperatorSetUpdateRegistry *opsetupdatereg.ContractSFFLOperatorSetUpdateRegistry TaskManager *taskmanager.ContractSFFLTaskManager ServiceManager *csservicemanager.ContractSFFLServiceManager + BlsApkRegistry blsapkreg.ContractBLSApkRegistryFilters ethClient eth.Client logger logging.Logger } @@ -62,11 +63,22 @@ func NewAvsManagersBindings(registryCoordinatorAddr, operatorStateRetrieverAddr return nil, err } + blsApkRegistryAddr, err := contractRegistryCoordinator.BlsApkRegistry(&bind.CallOpts{}) + if err != nil { + return nil, err + } + + blsApkRegistry, err := blsapkreg.NewContractBLSApkRegistry(blsApkRegistryAddr, ethclient) + if err != nil { + return nil, err + } + return &AvsManagersBindings{ RegistryCoordinator: contractRegistryCoordinator, OperatorSetUpdateRegistry: contractOperatorSetUpdateRegistry, ServiceManager: contractServiceManager, TaskManager: contractTaskManager, + BlsApkRegistry: blsApkRegistry, ethClient: ethclient, logger: logger, }, nil From fb247f2d1937c87bc4838f014066b3b642227b78 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Wed, 3 Jul 2024 11:06:21 +0100 Subject: [PATCH 06/12] test: Set log level to debug for int test --- config-files/aggregator.yaml | 2 +- config-files/operator.anvil.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config-files/aggregator.yaml b/config-files/aggregator.yaml index 5d68b015..f4535fb0 100644 --- a/config-files/aggregator.yaml +++ b/config-files/aggregator.yaml @@ -1,5 +1,5 @@ # 'production' only prints info and above. 'development' also prints debug -environment: production +environment: development eth_rpc_url: http://localhost:8545 eth_ws_url: ws://localhost:8545 # address which the aggregator listens on for operator signed messages diff --git a/config-files/operator.anvil.yaml b/config-files/operator.anvil.yaml index 4349686c..9dfc56e4 100644 --- a/config-files/operator.anvil.yaml +++ b/config-files/operator.anvil.yaml @@ -1,5 +1,5 @@ # this sets the logger level (true = info, false = debug) -production: true +production: false operator_address: 0x0000000000000000000000000000000000000000 From 27286ae73563188c41a5e1c1f7241bc46445caaa Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Wed, 3 Jul 2024 11:14:10 +0100 Subject: [PATCH 07/12] fix: Fix socket indexing --- aggregator/operator_registrations_inmemory.go | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/aggregator/operator_registrations_inmemory.go b/aggregator/operator_registrations_inmemory.go index f6b687ec..58c220be 100644 --- a/aggregator/operator_registrations_inmemory.go +++ b/aggregator/operator_registrations_inmemory.go @@ -25,10 +25,10 @@ type OperatorRegistrationsServiceInMemory struct { queryByAddrC chan<- queryByAddr queryByIdC chan<- queryById - idToAddr map[types.OperatorId]common.Address - addrToId map[common.Address]types.OperatorId - pubkeysByAddr map[common.Address]types.OperatorPubkeys - socketsByAddr map[common.Address]types.Socket + idToAddr map[types.OperatorId]common.Address + addrToId map[common.Address]types.OperatorId + pubkeysById map[types.OperatorId]types.OperatorPubkeys + socketById map[types.OperatorId]types.Socket } type queryByAddr struct { @@ -65,8 +65,8 @@ func NewOperatorRegistrationsServiceInMemory( queryByIdC: queryByIdC, idToAddr: make(map[types.OperatorId]common.Address), addrToId: make(map[common.Address]types.OperatorId), - pubkeysByAddr: make(map[common.Address]types.OperatorPubkeys), - socketsByAddr: make(map[common.Address]types.Socket), + pubkeysById: make(map[types.OperatorId]types.OperatorPubkeys), + socketById: make(map[types.OperatorId]types.Socket), } err := ors.asyncInit(ctx, queryByAddrC, queryByIdC) if err != nil { @@ -140,7 +140,7 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con ors.idToAddr[operatorId] = operatorAddr ors.addrToId[operatorAddr] = operatorId - ors.pubkeysByAddr[operatorAddr] = pubkeys + ors.pubkeysById[operatorId] = pubkeys ors.logger.Debug("Added operator info to dict", "service", "OperatorRegistrationsServiceInMemory", @@ -156,30 +156,28 @@ func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx con socket := types.Socket(newSocketRegistrationEvent.Socket) ors.logger.Debug("Received new socket registration event", "service", "OperatorRegistrationsServiceInMemory", "operatorId", operatorId, "socket", socket) - if addr, exists := ors.idToAddr[operatorId]; exists { - ors.socketsByAddr[addr] = socket - } + ors.socketById[operatorId] = socket case q := <-queryByAddrC: - pubkeys, pubkeysExist := ors.pubkeysByAddr[q.operatorAddr] - socket, socketExists := ors.socketsByAddr[q.operatorAddr] + operatorId, idExists := ors.addrToId[q.operatorAddr] + pubkeys, pubkeysExist := ors.pubkeysById[operatorId] + socket, socketExists := ors.socketById[operatorId] operatorInfo := types.OperatorInfo{ Pubkeys: pubkeys, Socket: socket, } - q.respC <- resp{operatorInfo, pubkeysExist && socketExists} + q.respC <- resp{operatorInfo, idExists && pubkeysExist && socketExists} case q := <-queryByIdC: - addr, exists := ors.idToAddr[q.operatorId] - pubkeys, pubkeysExist := ors.pubkeysByAddr[addr] - socket, socketExists := ors.socketsByAddr[addr] + pubkeys, pubkeysExist := ors.pubkeysById[q.operatorId] + socket, socketExists := ors.socketById[q.operatorId] operatorInfo := types.OperatorInfo{ Pubkeys: pubkeys, Socket: socket, } - q.respC <- resp{operatorInfo, exists && pubkeysExist && socketExists} + q.respC <- resp{operatorInfo, pubkeysExist && socketExists} } } }() @@ -204,8 +202,8 @@ func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ct ors.idToAddr[operatorId] = operatorAddr ors.addrToId[operatorAddr] = operatorId - ors.pubkeysByAddr[operatorAddr] = operatorPubkeys - ors.socketsByAddr[operatorAddr] = socketById[operatorId] + ors.pubkeysById[operatorId] = operatorPubkeys + ors.socketById[operatorId] = socketById[operatorId] } return nil From bc679685e81a3f20ebf09d443395b20dfb5def13 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Wed, 3 Jul 2024 12:23:23 +0100 Subject: [PATCH 08/12] refactor: Refactor signature invalidation --- aggregator/rpc_server_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aggregator/rpc_server_test.go b/aggregator/rpc_server_test.go index 55aeafee..a609375f 100644 --- a/aggregator/rpc_server_test.go +++ b/aggregator/rpc_server_test.go @@ -100,7 +100,7 @@ func TestProcessInvalidSignedStateRootUpdateMessage(t *testing.T) { signedMessage, err := createMockSignedStateRootUpdateMessage(message, *MOCK_OPERATOR_KEYPAIR) assert.Nil(t, err) - invalidateSignature(&signedMessage.BlsSignature) + signedMessage.BlsSignature = *newInvalidSignature() mockOperatorRegistrationsServ.EXPECT().GetOperatorInfoById(context.Background(), signedMessage.OperatorId).Return(eigentypes.OperatorInfo{Pubkeys: MOCK_OPERATOR_PUBKEYS}, true) err = aggregator.ProcessSignedStateRootUpdateMessage(signedMessage) @@ -208,6 +208,6 @@ func createMockSignedOperatorSetUpdateMessage(mockMessage messages.OperatorSetUp return signedOperatorSetUpdateMessage, nil } -func invalidateSignature(signature *bls.Signature) { - signature.G1Affine.Neg(signature.G1Affine) +func newInvalidSignature() *bls.Signature { + return bls.NewZeroSignature() } From ee805a2d1a1a9b1a44a73a87b779127aee0ae688 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Tue, 23 Jul 2024 16:32:36 -0300 Subject: [PATCH 09/12] feat: Add UnsupportedMessageTypeError --- aggregator/aggregator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index f2865cd5..07195015 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -46,6 +46,7 @@ var ( GetOperatorSetUpdateBlockError = errors.New("Failed to get operator set update block") OperatorNotFoundError = errors.New("Operator not found") InvalidSignatureError = errors.New("Invalid signature") + UnsupportedMessageTypeError = errors.New("Unsupported message type") // REST errors StateRootUpdateNotFoundError = errors.New("StateRootUpdate not found") @@ -739,6 +740,8 @@ func (agg *Aggregator) verifySignature(signedMessage interface{}) error { if err != nil { return DigestError } + default: + return UnsupportedMessageTypeError } operatorInfo, ok := agg.GetOperatorInfoById(context.Background(), operatorId) From fdf489df7d926f9e39131ee224457ff8d94d0e1e Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Tue, 23 Jul 2024 16:39:50 -0300 Subject: [PATCH 10/12] test: Generate mocks and fix assignment --- aggregator/aggregator_test.go | 4 ++-- aggregator/mocks/eth_client.go | 1 - aggregator/mocks/message_blsagg.go | 1 - aggregator/mocks/rest_aggregator.go | 1 - aggregator/mocks/rollup_broadcaster.go | 1 - aggregator/mocks/rpc_aggregator.go | 18 +++++++++++++++++- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index bbde6e27..0ffe4914 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -159,7 +159,7 @@ func TestExpiredStateRootUpdateMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.NoError(t, err) nowTimestamp := uint64(6000) @@ -179,7 +179,7 @@ func TestExpiredOperatorSetUpdate(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) + aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.NoError(t, err) nowTimestamp := uint64(8000) diff --git a/aggregator/mocks/eth_client.go b/aggregator/mocks/eth_client.go index 081e963c..e7a5f5d7 100644 --- a/aggregator/mocks/eth_client.go +++ b/aggregator/mocks/eth_client.go @@ -5,7 +5,6 @@ // // mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client // - // Package mocks is a generated GoMock package. package mocks diff --git a/aggregator/mocks/message_blsagg.go b/aggregator/mocks/message_blsagg.go index 14223ecf..8abfd2fb 100644 --- a/aggregator/mocks/message_blsagg.go +++ b/aggregator/mocks/message_blsagg.go @@ -5,7 +5,6 @@ // // mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService // - // Package mocks is a generated GoMock package. package mocks diff --git a/aggregator/mocks/rest_aggregator.go b/aggregator/mocks/rest_aggregator.go index 524ec199..95c9b132 100644 --- a/aggregator/mocks/rest_aggregator.go +++ b/aggregator/mocks/rest_aggregator.go @@ -5,7 +5,6 @@ // // mockgen -destination=./mocks/rest_aggregator.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RestAggregatorer // - // Package mocks is a generated GoMock package. package mocks diff --git a/aggregator/mocks/rollup_broadcaster.go b/aggregator/mocks/rollup_broadcaster.go index 639c6cb0..b048ec4b 100644 --- a/aggregator/mocks/rollup_broadcaster.go +++ b/aggregator/mocks/rollup_broadcaster.go @@ -5,7 +5,6 @@ // // mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer // - // Package mocks is a generated GoMock package. package mocks diff --git a/aggregator/mocks/rpc_aggregator.go b/aggregator/mocks/rpc_aggregator.go index b0016a50..2768a7ed 100644 --- a/aggregator/mocks/rpc_aggregator.go +++ b/aggregator/mocks/rpc_aggregator.go @@ -5,13 +5,14 @@ // // mockgen -destination=./mocks/rpc_aggregator.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RpcAggregatorer // - // Package mocks is a generated GoMock package. package mocks import ( + context "context" reflect "reflect" + types "github.com/Layr-Labs/eigensdk-go/types" messages "github.com/NethermindEth/near-sffl/core/types/messages" gomock "go.uber.org/mock/gomock" ) @@ -54,6 +55,21 @@ func (mr *MockRpcAggregatorerMockRecorder) GetAggregatedCheckpointMessages(arg0, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAggregatedCheckpointMessages", reflect.TypeOf((*MockRpcAggregatorer)(nil).GetAggregatedCheckpointMessages), arg0, arg1) } +// GetOperatorInfoById mocks base method. +func (m *MockRpcAggregatorer) GetOperatorInfoById(arg0 context.Context, arg1 types.Bytes32) (types.OperatorInfo, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOperatorInfoById", arg0, arg1) + ret0, _ := ret[0].(types.OperatorInfo) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetOperatorInfoById indicates an expected call of GetOperatorInfoById. +func (mr *MockRpcAggregatorerMockRecorder) GetOperatorInfoById(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOperatorInfoById", reflect.TypeOf((*MockRpcAggregatorer)(nil).GetOperatorInfoById), arg0, arg1) +} + // GetRegistryCoordinatorAddress mocks base method. func (m *MockRpcAggregatorer) GetRegistryCoordinatorAddress(arg0 *string) error { m.ctrl.T.Helper() From 8d8577091159b110fccefeeed01fbeae26a85494 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Tue, 23 Jul 2024 18:24:37 -0300 Subject: [PATCH 11/12] feat: Check message timestamp before signature --- aggregator/aggregator.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 07195015..b8e75291 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -557,22 +557,22 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR // Rpc request handlers func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) error { - err := agg.verifySignature(signedStateRootUpdateMessage) + timestamp := signedStateRootUpdateMessage.Message.Timestamp + err := agg.validateMessageTimestamp(timestamp) if err != nil { + agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp) return err } - messageDigest, err := signedStateRootUpdateMessage.Message.Digest() + err = agg.verifySignature(signedStateRootUpdateMessage) if err != nil { - agg.logger.Error("Failed to get message digest", "err", err) - return DigestError + return err } - timestamp := signedStateRootUpdateMessage.Message.Timestamp - err = agg.validateMessageTimestamp(timestamp) + messageDigest, err := signedStateRootUpdateMessage.Message.Digest() if err != nil { - agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp) - return err + agg.logger.Error("Failed to get message digest", "err", err) + return DigestError } err = agg.stateRootUpdateBlsAggregationService.InitializeMessageIfNotExists( @@ -599,22 +599,22 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate } func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error { - err := agg.verifySignature(signedOperatorSetUpdateMessage) + timestamp := signedOperatorSetUpdateMessage.Message.Timestamp + err := agg.validateMessageTimestamp(timestamp) if err != nil { + agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp) return err } - messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest() + err = agg.verifySignature(signedOperatorSetUpdateMessage) if err != nil { - agg.logger.Error("Failed to get message digest", "err", err) - return DigestError + return err } - timestamp := signedOperatorSetUpdateMessage.Message.Timestamp - err = agg.validateMessageTimestamp(timestamp) + messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest() if err != nil { - agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp) - return err + agg.logger.Error("Failed to get message digest", "err", err) + return DigestError } blockNumber, err := agg.avsReader.GetOperatorSetUpdateBlock(context.Background(), signedOperatorSetUpdateMessage.Message.Id) From 330cb9622962b079ffe187e843e7e9bdc0248995 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Tue, 23 Jul 2024 18:28:00 -0300 Subject: [PATCH 12/12] test: Fix clock mismatch on test --- aggregator/rpc_server_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aggregator/rpc_server_test.go b/aggregator/rpc_server_test.go index d0ce6cd1..2297ceff 100644 --- a/aggregator/rpc_server_test.go +++ b/aggregator/rpc_server_test.go @@ -91,10 +91,11 @@ func TestProcessInvalidSignedStateRootUpdateMessage(t *testing.T) { aggregator, _, _, _, _, _, mockOperatorRegistrationsServ, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT) assert.Nil(t, err) + aggregator.clock = core.Clock{Now: func() time.Time { return time.Unix(10_000, 0) }} message := messages.StateRootUpdateMessage{ RollupId: 1, BlockHeight: 2, - Timestamp: 3, + Timestamp: 9_995, NearDaCommitment: keccak256(4), NearDaTransactionId: keccak256(5), StateRoot: keccak256(6),