Skip to content

Commit

Permalink
Add additional checkpoint checks and timestamps logic (#74)
Browse files Browse the repository at this point in the history
* feat: Store last checkpoint toTimestamp and add more timestamp checks to task creation

* feat: Remove OperatorStateRetriever from SFFLTaskManager

* feat: Update contract bindings

* feat: Update deployed AVS state

* docs: Fix docs for lastCheckpointToTimestamp

* feat: Add checkpoint timestamp logic to aggregator

* feat: Set checkpoint interval in aggregator config
  • Loading branch information
Hyodar authored Apr 17, 2024
1 parent 3188295 commit 5c136f7
Show file tree
Hide file tree
Showing 25 changed files with 787 additions and 188 deletions.
49 changes: 35 additions & 14 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"context"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -72,11 +73,12 @@ type Aggregator struct {
logger logging.Logger
serverIpPortAddr string
restServerIpPortAddr string
checkpointInterval time.Duration
avsWriter chainio.AvsWriterer
avsReader chainio.AvsReaderer
rollupBroadcaster RollupBroadcasterer
client eth.Client

// aggregation related fields
taskBlsAggregationService blsagg.BlsAggregationService
stateRootUpdateBlsAggregationService MessageBlsAggregationService
operatorSetUpdateBlsAggregationService MessageBlsAggregationService
Expand Down Expand Up @@ -175,9 +177,11 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
logger: logger,
serverIpPortAddr: config.AggregatorServerIpPortAddr,
restServerIpPortAddr: config.AggregatorRestServerIpPortAddr,
checkpointInterval: config.AggregatorCheckpointInterval,
avsWriter: avsWriter,
avsReader: avsReader,
rollupBroadcaster: rollupBroadcaster,
client: ethHttpClient,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService,
Expand All @@ -198,17 +202,10 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.logger.Infof("Starting aggregator REST API.")
go agg.startRestServer()

// TODO(soubhik): refactor task generation/sending into a separate function that we can run as goroutine
ticker := time.NewTicker(40 * time.Second)
agg.logger.Infof("Aggregator set to send new task every 40 seconds...")
ticker := time.NewTicker(agg.checkpointInterval)
agg.logger.Infof("Aggregator set to send new task every %s...", agg.checkpointInterval.String())
defer ticker.Stop()

// ticker doesn't tick immediately, so we send the first task here
// see https://github.com/golang/go/issues/17601

// TODO: make this based on the actual timestamps
timestamp := uint64(0)

broadcasterErrorChan := agg.rollupBroadcaster.GetErrorChan()
for {
select {
Expand All @@ -224,10 +221,9 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.logger.Info("Received response from operatorSetUpdateBlsAggregationService", "blsAggServiceResp", blsAggServiceResp)
agg.handleOperatorSetUpdateReachedQuorum(ctx, blsAggServiceResp)
case <-ticker.C:
err := agg.sendNewCheckpointTask(timestamp, timestamp)
timestamp++
err := agg.sendNewCheckpointTask()
if err != nil {
// we log the errors inside sendNewCheckpointTask() so here we just continue to the next task
agg.logger.Error("Failed to send new checkpoint task", "err", err)
continue
}

Expand Down Expand Up @@ -278,7 +274,32 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg

// sendNewCheckpointTask sends a new task to the task manager contract, and updates the Task dict struct
// with the information of operators opted into quorum 0 at the block of task creation.
func (agg *Aggregator) sendNewCheckpointTask(fromTimestamp uint64, toTimestamp uint64) error {
func (agg *Aggregator) sendNewCheckpointTask() error {
blockNumber, err := agg.client.BlockNumber(context.Background())
if err != nil {
agg.logger.Error("Failed to get block number", "err", err)
return err
}

block, err := agg.client.BlockByNumber(context.Background(), big.NewInt(0).SetUint64(blockNumber))
if err != nil {
agg.logger.Error("Failed to get block", "err", err)
return err
}

lastCheckpointToTimestamp, err := agg.avsReader.GetLastCheckpointToTimestamp(context.Background())
if err != nil {
agg.logger.Error("Failed to get last checkpoint toTimestamp", "err", err)
return err
}

fromTimestamp := lastCheckpointToTimestamp + 1
if lastCheckpointToTimestamp == 0 {
fromTimestamp = 0
}

toTimestamp := block.Time()

agg.logger.Info("Aggregator sending new task", "fromTimestamp", fromTimestamp, "toTimestamp", toTimestamp)
// Send checkpoint to the task manager contract
newTask, taskIndex, err := agg.avsWriter.SendNewCheckpointTask(context.Background(), fromTimestamp, toTimestamp, types.QUORUM_THRESHOLD_NUMERATOR, coretypes.QUORUM_NUMBERS)
Expand Down
41 changes: 20 additions & 21 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
gethcore "github.com/ethereum/go-ethereum/core"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
blsaggservmock "github.com/Layr-Labs/eigensdk-go/services/mocks/blsagg"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"
gethtypes "github.com/ethereum/go-ethereum/core/types"

dbmocks "github.com/NethermindEth/near-sffl/aggregator/database/mocks"
aggmocks "github.com/NethermindEth/near-sffl/aggregator/mocks"
Expand Down Expand Up @@ -54,17 +53,24 @@ func TestSendNewTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
var BLOCK_NUMBER = uint32(100)
var FROM_NEAR_BLOCK = uint64(3)
var TO_NEAR_BLOCK = uint64(4)
var FROM_TIMESTAMP = uint64(3)
var TO_TIMESTAMP = uint64(4)

mockClient.EXPECT().BlockNumber(context.Background()).Return(uint64(BLOCK_NUMBER), nil)
mockClient.EXPECT().BlockByNumber(context.Background(), big.NewInt(int64(BLOCK_NUMBER))).Return(
gethtypes.NewBlockWithHeader(&gethtypes.Header{Time: TO_TIMESTAMP}),
nil,
)

mockAvsWriterer.EXPECT().SendNewCheckpointTask(
context.Background(), FROM_NEAR_BLOCK, TO_NEAR_BLOCK, types.QUORUM_THRESHOLD_NUMERATOR, coretypes.QUORUM_NUMBERS,
).Return(aggmocks.MockSendNewCheckpointTask(BLOCK_NUMBER, TASK_INDEX, FROM_NEAR_BLOCK, TO_NEAR_BLOCK))
context.Background(), FROM_TIMESTAMP, TO_TIMESTAMP, types.QUORUM_THRESHOLD_NUMERATOR, coretypes.QUORUM_NUMBERS,
).Return(aggmocks.MockSendNewCheckpointTask(BLOCK_NUMBER, TASK_INDEX, FROM_TIMESTAMP, TO_TIMESTAMP))
mockAvsReaderer.EXPECT().GetLastCheckpointToTimestamp(context.Background()).Return(FROM_TIMESTAMP-1, nil)

// 100 blocks, each takes 12 seconds. We hardcode for now since aggregator also hardcodes this value
taskTimeToExpiry := 100 * 12 * time.Second
Expand All @@ -73,15 +79,15 @@ func TestSendNewTask(t *testing.T) {
// see https://hynek.me/articles/what-to-mock-in-5-mins/
mockTaskBlsAggService.EXPECT().InitializeNewTask(TASK_INDEX, BLOCK_NUMBER, coretypes.QUORUM_NUMBERS, []uint32{types.QUORUM_THRESHOLD_NUMERATOR}, taskTimeToExpiry)

err = aggregator.sendNewCheckpointTask(FROM_NEAR_BLOCK, TO_NEAR_BLOCK)
err = aggregator.sendNewCheckpointTask()
assert.Nil(t, err)
}

func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.StateRootUpdateMessage{}
Expand Down Expand Up @@ -110,7 +116,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.OperatorSetUpdateMessage{}
Expand Down Expand Up @@ -143,7 +149,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[bls.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *aggmocks.MockClient, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
Expand All @@ -152,6 +158,7 @@ func createMockAggregator(
mockOperatorSetUpdateBlsAggregationService := aggmocks.NewMockMessageBlsAggregationService(mockCtrl)
mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl)
mockRollupBroadcaster := aggmocks.NewMockRollupBroadcasterer(mockCtrl)
mockClient := aggmocks.NewMockClient(mockCtrl)

aggregator := &Aggregator{
logger: logger,
Expand All @@ -166,15 +173,7 @@ func createMockAggregator(
stateRootUpdates: make(map[coretypes.MessageDigest]messages.StateRootUpdateMessage),
operatorSetUpdates: make(map[coretypes.MessageDigest]messages.OperatorSetUpdateMessage),
rollupBroadcaster: mockRollupBroadcaster,
client: mockClient,
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, nil
}

// just a mock ethclient to pass to bindings
// so that we can access abi methods
func createMockEthClient() *backends.SimulatedBackend {
genesisAlloc := map[common.Address]gethcore.GenesisAccount{}
blockGasLimit := uint64(1000000)
client := backends.NewSimulatedBackend(genesisAlloc, blockGasLimit)
return client
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
}
1 change: 1 addition & 0 deletions aggregator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package aggregator

//go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService
//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer
//go:generate mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client
Loading

0 comments on commit 5c136f7

Please sign in to comment.