Skip to content

Commit

Permalink
Custom pubkeys inmemory service (#194)
Browse files Browse the repository at this point in the history
* feat: custom pubkeys inmemory service

* fix: tests

* refactor: log update

* refactor: api change in queryPastRegisteredOperators

* feat: switched to safe client use. Handle init errors

* test: Set log level to debug for int test

* fix: Fix socket indexing

* refactor: Refactor signature invalidation

* feat: Add UnsupportedMessageTypeError

* test: Generate mocks and fix assignment

* feat: Check message timestamp before signature

* test: Fix clock mismatch on test

---------

Co-authored-by: Franco Barpp Gomes <[email protected]>
  • Loading branch information
taco-paco and Hyodar authored Jul 24, 2024
1 parent 2379ad7 commit 3543752
Show file tree
Hide file tree
Showing 17 changed files with 560 additions and 63 deletions.
117 changes: 93 additions & 24 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
"sync"
"time"

chainioavsregistry "github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/metrics"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
opinfoserv "github.com/Layr-Labs/eigensdk-go/services/operatorsinfo"
"github.com/Layr-Labs/eigensdk-go/signerv2"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"

"github.com/NethermindEth/near-sffl/aggregator/database"
Expand Down Expand Up @@ -46,6 +44,9 @@ var (
DigestError = errors.New("Failed to get message digest")
TaskResponseDigestError = errors.New("Failed to get task response digest")
GetOperatorSetUpdateBlockError = errors.New("Failed to get operator set update block")
OperatorNotFoundError = errors.New("Operator not found")
InvalidSignatureError = errors.New("Invalid signature")
UnsupportedMessageTypeError = errors.New("Unsupported message type")

// REST errors
StateRootUpdateNotFoundError = errors.New("StateRootUpdate not found")
Expand All @@ -61,6 +62,7 @@ type RpcAggregatorer interface {
ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error
GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error)
GetRegistryCoordinatorAddress(reply *string) error
GetOperatorInfoById(ctx context.Context, operatorId eigentypes.OperatorId) (eigentypes.OperatorInfo, bool)
}

type RestAggregatorer interface {
Expand Down Expand Up @@ -120,6 +122,7 @@ type Aggregator struct {
metrics metrics.Metrics
aggregatorListener AggregatorEventListener

operatorRegistrationsService OperatorRegistrationsService
taskBlsAggregationService blsagg.BlsAggregationService
stateRootUpdateBlsAggregationService MessageBlsAggregationService
operatorSetUpdateBlsAggregationService MessageBlsAggregationService
Expand Down Expand Up @@ -184,13 +187,6 @@ func NewAggregator(

txMgr := txmgr.NewSimpleTxManager(txSender, ethHttpClient, logger, config.AggregatorAddress).WithGasLimitMultiplier(1.5)

// note that the subscriber needs a ws connection instead of http
avsRegistryChainSubscriber, err := chainioavsregistry.BuildAvsRegistryChainSubscriber(common.HexToAddress(config.SFFLRegistryCoordinatorAddr.String()), ethWsClient, logger)
if err != nil {
logger.Error("Cannot create AvsRegistryChainSubscriber", "err", err)
return nil, err
}

avsWriter, err := chainio.BuildAvsWriterFromConfig(txMgr, config, ethHttpClient, logger)
if err != nil {
logger.Error("Cannot create avsWriter", "err", err)
Expand All @@ -215,8 +211,12 @@ func NewAggregator(
return nil, err
}

operatorPubkeysService := opinfoserv.NewOperatorsInfoServiceInMemory(ctx, avsRegistryChainSubscriber, avsReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger)
operatorRegistrationsService, err := NewOperatorRegistrationsServiceInMemory(ctx, avsSubscriber, avsReader, logger)
if err != nil {
return nil, err
}

avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorRegistrationsService, logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)
stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, ethHttpClient, logger)
operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, ethHttpClient, logger)
Expand All @@ -232,6 +232,7 @@ func NewAggregator(
rollupBroadcaster: rollupBroadcaster,
httpClient: ethHttpClient,
wsClient: ethWsClient,
operatorRegistrationsService: operatorRegistrationsService,
clock: core.SystemClock,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
Expand Down Expand Up @@ -522,6 +523,11 @@ func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(ctx context.Context,
}

func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) error {
err := agg.verifySignature(signedCheckpointTaskResponse)
if err != nil {
return err
}

taskIndex := signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex
taskResponseDigest, err := signedCheckpointTaskResponse.TaskResponse.Digest()
if err != nil {
Expand Down Expand Up @@ -551,19 +557,24 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR

// Rpc request handlers
func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) error {
messageDigest, err := signedStateRootUpdateMessage.Message.Digest()
timestamp := signedStateRootUpdateMessage.Message.Timestamp
err := agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

timestamp := signedStateRootUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
err = agg.verifySignature(signedStateRootUpdateMessage)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

messageDigest, err := signedStateRootUpdateMessage.Message.Digest()
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
}

err = agg.stateRootUpdateBlsAggregationService.InitializeMessageIfNotExists(
messageDigest,
coretypes.QUORUM_NUMBERS,
Expand All @@ -588,19 +599,24 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate
}

func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error {
messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest()
timestamp := signedOperatorSetUpdateMessage.Message.Timestamp
err := agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

timestamp := signedOperatorSetUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
err = agg.verifySignature(signedOperatorSetUpdateMessage)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest()
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
}

blockNumber, err := agg.avsReader.GetOperatorSetUpdateBlock(context.Background(), signedOperatorSetUpdateMessage.Message.Id)
if err != nil {
agg.logger.Error("Failed to get operator set update block", "err", err)
Expand Down Expand Up @@ -691,6 +707,59 @@ func (agg *Aggregator) GetCheckpointMessages(fromTimestamp, toTimestamp uint64)
}, nil
}

func (agg *Aggregator) GetOperatorInfoById(ctx context.Context, operatorId eigentypes.OperatorId) (eigentypes.OperatorInfo, bool) {
operatorInfo, ok := agg.operatorRegistrationsService.GetOperatorInfoById(ctx, operatorId)
return operatorInfo, ok
}

func (agg *Aggregator) verifySignature(signedMessage interface{}) error {
var operatorId eigentypes.OperatorId
var signature bls.Signature
var digest [32]byte
var err error

switch signedMessage := signedMessage.(type) {
case *messages.SignedCheckpointTaskResponse:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.TaskResponse.Digest()
if err != nil {
return TaskResponseDigestError
}
case *messages.SignedStateRootUpdateMessage:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.Message.Digest()
if err != nil {
return DigestError
}
case *messages.SignedOperatorSetUpdateMessage:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.Message.Digest()
if err != nil {
return DigestError
}
default:
return UnsupportedMessageTypeError
}

operatorInfo, ok := agg.GetOperatorInfoById(context.Background(), operatorId)
if !ok {
return OperatorNotFoundError
}

ok, err = signature.Verify(operatorInfo.Pubkeys.G2Pubkey, digest)
if err != nil {
return InvalidSignatureError
}
if !ok {
return InvalidSignatureError
}

return nil
}

func (agg *Aggregator) validateMessageTimestamp(messageTimestamp uint64) error {
now := agg.clock.Now().Unix()
timeoutInSeconds := types.MESSAGE_SUBMISSION_TIMEOUT.Seconds()
Expand Down
27 changes: 15 additions & 12 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ var MOCK_OPERATOR_BLS_PRIVATE_KEY, _ = bls.NewPrivateKey(MOCK_OPERATOR_BLS_PRIVA
var MOCK_OPERATOR_KEYPAIR = bls.NewKeyPair(MOCK_OPERATOR_BLS_PRIVATE_KEY)
var MOCK_OPERATOR_G1PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG1()
var MOCK_OPERATOR_G2PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG2()
var MOCK_OPERATOR_PUBKEYS = eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
}
var MOCK_OPERATOR_PUBKEY_DICT = map[eigentypes.OperatorId]types.OperatorInfo{
MOCK_OPERATOR_ID: {
OperatorPubkeys: eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
},
OperatorAddr: common.Address{},
OperatorPubkeys: MOCK_OPERATOR_PUBKEYS,
OperatorAddr: common.Address{},
},
}

Expand All @@ -55,7 +56,7 @@ func TestSendNewTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.StateRootUpdateMessage{}
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.OperatorSetUpdateMessage{}
Expand Down Expand Up @@ -158,7 +159,7 @@ func TestExpiredStateRootUpdateMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(6000)
Expand All @@ -178,7 +179,7 @@ func TestExpiredOperatorSetUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(8000)
Expand All @@ -196,7 +197,7 @@ func TestExpiredOperatorSetUpdate(t *testing.T) {

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
Expand All @@ -206,6 +207,7 @@ func createMockAggregator(
mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl)
mockRollupBroadcaster := aggmocks.NewMockRollupBroadcasterer(mockCtrl)
mockClient := safeclientmocks.NewMockSafeClient(mockCtrl)
mockOperatorRegistrationsService := aggmocks.NewMockOperatorRegistrationsService(mockCtrl)

aggregator := &Aggregator{
logger: logger,
Expand All @@ -214,6 +216,7 @@ func createMockAggregator(
taskBlsAggregationService: mockTaskBlsAggregationService,
stateRootUpdateBlsAggregationService: mockStateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: mockOperatorSetUpdateBlsAggregationService,
operatorRegistrationsService: mockOperatorRegistrationsService,
msgDb: mockMsgDb,
tasks: make(map[coretypes.TaskIndex]taskmanager.CheckpointTask),
taskResponses: make(map[coretypes.TaskIndex]map[eigentypes.TaskResponseDigest]messages.CheckpointTaskResponse),
Expand All @@ -225,5 +228,5 @@ func createMockAggregator(
aggregatorListener: &SelectiveAggregatorListener{},
clock: core.SystemClock,
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockOperatorRegistrationsService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
}
1 change: 1 addition & 0 deletions aggregator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package aggregator
//go:generate mockgen -destination=./mocks/rpc_aggregator.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RpcAggregatorer
//go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService
//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer
//go:generate mockgen -destination=./mocks/operator_registrations_inmemory.go -package=mocks github.com/NethermindEth/near-sffl/aggregator OperatorRegistrationsService
//go:generate mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client
1 change: 0 additions & 1 deletion aggregator/mocks/eth_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion aggregator/mocks/message_blsagg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3543752

Please sign in to comment.