Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom pubkeys inmemory service #194

Merged
merged 14 commits into from
Jul 24, 2024
117 changes: 93 additions & 24 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
"sync"
"time"

chainioavsregistry "github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/metrics"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
opinfoserv "github.com/Layr-Labs/eigensdk-go/services/operatorsinfo"
"github.com/Layr-Labs/eigensdk-go/signerv2"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"

"github.com/NethermindEth/near-sffl/aggregator/database"
Expand Down Expand Up @@ -46,6 +44,9 @@ var (
DigestError = errors.New("Failed to get message digest")
TaskResponseDigestError = errors.New("Failed to get task response digest")
GetOperatorSetUpdateBlockError = errors.New("Failed to get operator set update block")
OperatorNotFoundError = errors.New("Operator not found")
InvalidSignatureError = errors.New("Invalid signature")
UnsupportedMessageTypeError = errors.New("Unsupported message type")

// REST errors
StateRootUpdateNotFoundError = errors.New("StateRootUpdate not found")
Expand All @@ -61,6 +62,7 @@ type RpcAggregatorer interface {
ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error
GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error)
GetRegistryCoordinatorAddress(reply *string) error
GetOperatorInfoById(ctx context.Context, operatorId eigentypes.OperatorId) (eigentypes.OperatorInfo, bool)
}

type RestAggregatorer interface {
Expand Down Expand Up @@ -120,6 +122,7 @@ type Aggregator struct {
metrics metrics.Metrics
aggregatorListener AggregatorEventListener

operatorRegistrationsService OperatorRegistrationsService
taskBlsAggregationService blsagg.BlsAggregationService
stateRootUpdateBlsAggregationService MessageBlsAggregationService
operatorSetUpdateBlsAggregationService MessageBlsAggregationService
Expand Down Expand Up @@ -184,13 +187,6 @@ func NewAggregator(

txMgr := txmgr.NewSimpleTxManager(txSender, ethHttpClient, logger, config.AggregatorAddress).WithGasLimitMultiplier(1.5)

// note that the subscriber needs a ws connection instead of http
avsRegistryChainSubscriber, err := chainioavsregistry.BuildAvsRegistryChainSubscriber(common.HexToAddress(config.SFFLRegistryCoordinatorAddr.String()), ethWsClient, logger)
if err != nil {
logger.Error("Cannot create AvsRegistryChainSubscriber", "err", err)
return nil, err
}

avsWriter, err := chainio.BuildAvsWriterFromConfig(txMgr, config, ethHttpClient, logger)
if err != nil {
logger.Error("Cannot create avsWriter", "err", err)
Expand All @@ -215,8 +211,12 @@ func NewAggregator(
return nil, err
}

operatorPubkeysService := opinfoserv.NewOperatorsInfoServiceInMemory(ctx, avsRegistryChainSubscriber, avsReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger)
operatorRegistrationsService, err := NewOperatorRegistrationsServiceInMemory(ctx, avsSubscriber, avsReader, logger)
if err != nil {
return nil, err
}

avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorRegistrationsService, logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)
stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, ethHttpClient, logger)
operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, ethHttpClient, logger)
Expand All @@ -232,6 +232,7 @@ func NewAggregator(
rollupBroadcaster: rollupBroadcaster,
httpClient: ethHttpClient,
wsClient: ethWsClient,
operatorRegistrationsService: operatorRegistrationsService,
clock: core.SystemClock,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
Expand Down Expand Up @@ -522,6 +523,11 @@ func (agg *Aggregator) handleOperatorSetUpdateReachedQuorum(ctx context.Context,
}

func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) error {
err := agg.verifySignature(signedCheckpointTaskResponse)
if err != nil {
return err
}

taskIndex := signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex
taskResponseDigest, err := signedCheckpointTaskResponse.TaskResponse.Digest()
if err != nil {
Expand Down Expand Up @@ -551,19 +557,24 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR

// Rpc request handlers
func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) error {
messageDigest, err := signedStateRootUpdateMessage.Message.Digest()
timestamp := signedStateRootUpdateMessage.Message.Timestamp
err := agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

timestamp := signedStateRootUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
err = agg.verifySignature(signedStateRootUpdateMessage)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

messageDigest, err := signedStateRootUpdateMessage.Message.Digest()
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
}

err = agg.stateRootUpdateBlsAggregationService.InitializeMessageIfNotExists(
messageDigest,
coretypes.QUORUM_NUMBERS,
Expand All @@ -588,19 +599,24 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate
}

func (agg *Aggregator) ProcessSignedOperatorSetUpdateMessage(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error {
messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest()
timestamp := signedOperatorSetUpdateMessage.Message.Timestamp
err := agg.validateMessageTimestamp(timestamp)
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

timestamp := signedOperatorSetUpdateMessage.Message.Timestamp
err = agg.validateMessageTimestamp(timestamp)
err = agg.verifySignature(signedOperatorSetUpdateMessage)
if err != nil {
agg.logger.Error("Failed to validate message timestamp", "err", err, "timestamp", timestamp)
return err
}

messageDigest, err := signedOperatorSetUpdateMessage.Message.Digest()
if err != nil {
agg.logger.Error("Failed to get message digest", "err", err)
return DigestError
}

blockNumber, err := agg.avsReader.GetOperatorSetUpdateBlock(context.Background(), signedOperatorSetUpdateMessage.Message.Id)
if err != nil {
agg.logger.Error("Failed to get operator set update block", "err", err)
Expand Down Expand Up @@ -691,6 +707,59 @@ func (agg *Aggregator) GetCheckpointMessages(fromTimestamp, toTimestamp uint64)
}, nil
}

func (agg *Aggregator) GetOperatorInfoById(ctx context.Context, operatorId eigentypes.OperatorId) (eigentypes.OperatorInfo, bool) {
operatorInfo, ok := agg.operatorRegistrationsService.GetOperatorInfoById(ctx, operatorId)
return operatorInfo, ok
}

func (agg *Aggregator) verifySignature(signedMessage interface{}) error {
var operatorId eigentypes.OperatorId
var signature bls.Signature
var digest [32]byte
var err error

switch signedMessage := signedMessage.(type) {
case *messages.SignedCheckpointTaskResponse:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.TaskResponse.Digest()
if err != nil {
return TaskResponseDigestError
}
case *messages.SignedStateRootUpdateMessage:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.Message.Digest()
if err != nil {
return DigestError
}
case *messages.SignedOperatorSetUpdateMessage:
operatorId = signedMessage.OperatorId
signature = signedMessage.BlsSignature
digest, err = signedMessage.Message.Digest()
if err != nil {
return DigestError
}
default:
return UnsupportedMessageTypeError
}

operatorInfo, ok := agg.GetOperatorInfoById(context.Background(), operatorId)
if !ok {
return OperatorNotFoundError
}

ok, err = signature.Verify(operatorInfo.Pubkeys.G2Pubkey, digest)
if err != nil {
return InvalidSignatureError
}
if !ok {
return InvalidSignatureError
}

return nil
}

func (agg *Aggregator) validateMessageTimestamp(messageTimestamp uint64) error {
now := agg.clock.Now().Unix()
timeoutInSeconds := types.MESSAGE_SUBMISSION_TIMEOUT.Seconds()
Expand Down
27 changes: 15 additions & 12 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ var MOCK_OPERATOR_BLS_PRIVATE_KEY, _ = bls.NewPrivateKey(MOCK_OPERATOR_BLS_PRIVA
var MOCK_OPERATOR_KEYPAIR = bls.NewKeyPair(MOCK_OPERATOR_BLS_PRIVATE_KEY)
var MOCK_OPERATOR_G1PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG1()
var MOCK_OPERATOR_G2PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG2()
var MOCK_OPERATOR_PUBKEYS = eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
}
var MOCK_OPERATOR_PUBKEY_DICT = map[eigentypes.OperatorId]types.OperatorInfo{
MOCK_OPERATOR_ID: {
OperatorPubkeys: eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
},
OperatorAddr: common.Address{},
OperatorPubkeys: MOCK_OPERATOR_PUBKEYS,
OperatorAddr: common.Address{},
},
}

Expand All @@ -55,7 +56,7 @@ func TestSendNewTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.StateRootUpdateMessage{}
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.OperatorSetUpdateMessage{}
Expand Down Expand Up @@ -158,7 +159,7 @@ func TestExpiredStateRootUpdateMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(6000)
Expand All @@ -178,7 +179,7 @@ func TestExpiredOperatorSetUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.NoError(t, err)

nowTimestamp := uint64(8000)
Expand All @@ -196,7 +197,7 @@ func TestExpiredOperatorSetUpdate(t *testing.T) {

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
Expand All @@ -206,6 +207,7 @@ func createMockAggregator(
mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl)
mockRollupBroadcaster := aggmocks.NewMockRollupBroadcasterer(mockCtrl)
mockClient := safeclientmocks.NewMockSafeClient(mockCtrl)
mockOperatorRegistrationsService := aggmocks.NewMockOperatorRegistrationsService(mockCtrl)

aggregator := &Aggregator{
logger: logger,
Expand All @@ -214,6 +216,7 @@ func createMockAggregator(
taskBlsAggregationService: mockTaskBlsAggregationService,
stateRootUpdateBlsAggregationService: mockStateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: mockOperatorSetUpdateBlsAggregationService,
operatorRegistrationsService: mockOperatorRegistrationsService,
msgDb: mockMsgDb,
tasks: make(map[coretypes.TaskIndex]taskmanager.CheckpointTask),
taskResponses: make(map[coretypes.TaskIndex]map[eigentypes.TaskResponseDigest]messages.CheckpointTaskResponse),
Expand All @@ -225,5 +228,5 @@ func createMockAggregator(
aggregatorListener: &SelectiveAggregatorListener{},
clock: core.SystemClock,
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockOperatorRegistrationsService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
}
1 change: 1 addition & 0 deletions aggregator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package aggregator
//go:generate mockgen -destination=./mocks/rpc_aggregator.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RpcAggregatorer
//go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService
//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer
//go:generate mockgen -destination=./mocks/operator_registrations_inmemory.go -package=mocks github.com/NethermindEth/near-sffl/aggregator OperatorRegistrationsService
//go:generate mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client
1 change: 0 additions & 1 deletion aggregator/mocks/eth_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion aggregator/mocks/message_blsagg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading