diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 93ecfce9..e7a7217d 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -73,12 +73,12 @@ type Aggregator struct { taskBlsAggregationService blsagg.BlsAggregationService messageBlsAggregationService MessageBlsAggregationService tasks map[types.TaskIndex]taskmanager.CheckpointTask - tasksMu sync.RWMutex + tasksLock sync.RWMutex taskResponses map[types.TaskIndex]map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse - taskResponsesMu sync.RWMutex + taskResponsesLock sync.RWMutex msgDb *MessageDatabase stateRootUpdates map[types.MessageDigest]servicemanager.StateRootUpdateMessage - stateRootUpdatesMu sync.RWMutex + stateRootUpdatesLock sync.RWMutex } // NewAggregator creates a new Aggregator with the provided config. @@ -214,12 +214,12 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg agg.logger.Info("Threshold reached. Sending aggregated response onchain.", "taskIndex", blsAggServiceResp.TaskIndex, ) - agg.tasksMu.RLock() + agg.tasksLock.RLock() task := agg.tasks[blsAggServiceResp.TaskIndex] - agg.tasksMu.RUnlock() - agg.taskResponsesMu.RLock() + agg.tasksLock.RUnlock() + agg.taskResponsesLock.RLock() taskResponse := agg.taskResponses[blsAggServiceResp.TaskIndex][blsAggServiceResp.TaskResponseDigest] - agg.taskResponsesMu.RUnlock() + agg.taskResponsesLock.RUnlock() _, err := agg.avsWriter.SendAggregatedResponse(context.Background(), task, taskResponse, nonSignerStakesAndSignature) if err != nil { agg.logger.Error("Aggregator failed to respond to task", "err", err) @@ -237,9 +237,9 @@ func (agg *Aggregator) sendNewCheckpointTask(fromNearBlock uint64, toNearBlock u return err } - agg.tasksMu.Lock() + agg.tasksLock.Lock() agg.tasks[taskIndex] = newTask - agg.tasksMu.Unlock() + agg.tasksLock.Unlock() quorumThresholds := make([]uint32, len(newTask.QuorumNumbers)) for i, _ := range newTask.QuorumNumbers { @@ -253,14 +253,14 @@ func (agg *Aggregator) sendNewCheckpointTask(fromNearBlock uint64, toNearBlock u } func (agg *Aggregator) handleStateRootUpdateReachedQuorum(blsAggServiceResp types.MessageBlsAggregationServiceResponse) { - agg.stateRootUpdatesMu.RLock() + agg.stateRootUpdatesLock.RLock() msg, ok := agg.stateRootUpdates[blsAggServiceResp.MessageDigest] - agg.stateRootUpdatesMu.RUnlock() + agg.stateRootUpdatesLock.RUnlock() defer func() { - agg.stateRootUpdatesMu.RLock() + agg.stateRootUpdatesLock.RLock() delete(agg.stateRootUpdates, blsAggServiceResp.MessageDigest) - agg.stateRootUpdatesMu.RUnlock() + agg.stateRootUpdatesLock.RUnlock() }() if !ok { diff --git a/aggregator/message_blsagg.go b/aggregator/message_blsagg.go index 5b1a03fc..00a2dfb1 100644 --- a/aggregator/message_blsagg.go +++ b/aggregator/message_blsagg.go @@ -83,7 +83,7 @@ type MessageBlsAggregationService interface { type MessageBlsAggregatorService struct { aggregatedResponsesC chan aggtypes.MessageBlsAggregationServiceResponse signedMessageDigestsCs map[aggtypes.MessageDigest]chan SignedMessageDigest - messageChansMutex sync.RWMutex + messageChansLock sync.RWMutex avsRegistryService avsregistry.AvsRegistryService ethClient eth.EthClient logger logging.Logger @@ -95,7 +95,7 @@ func NewMessageBlsAggregatorService(avsRegistryService avsregistry.AvsRegistrySe return &MessageBlsAggregatorService{ aggregatedResponsesC: make(chan aggtypes.MessageBlsAggregationServiceResponse), signedMessageDigestsCs: make(map[aggtypes.MessageDigest]chan SignedMessageDigest), - messageChansMutex: sync.RWMutex{}, + messageChansLock: sync.RWMutex{}, avsRegistryService: avsRegistryService, ethClient: ethClient, logger: logger, @@ -117,9 +117,9 @@ func (mbas *MessageBlsAggregatorService) InitializeMessageIfNotExists( } signedMessageDigestsC := make(chan SignedMessageDigest) - mbas.messageChansMutex.Lock() + mbas.messageChansLock.Lock() mbas.signedMessageDigestsCs[messageDigest] = signedMessageDigestsC - mbas.messageChansMutex.Unlock() + mbas.messageChansLock.Unlock() go mbas.singleMessageAggregatorGoroutineFunc(messageDigest, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedMessageDigestsC) return nil } @@ -130,9 +130,9 @@ func (mbas *MessageBlsAggregatorService) ProcessNewSignature( blsSignature *bls.Signature, operatorId bls.OperatorId, ) error { - mbas.messageChansMutex.Lock() + mbas.messageChansLock.Lock() messageC, taskInitialized := mbas.signedMessageDigestsCs[messageDigest] - mbas.messageChansMutex.Unlock() + mbas.messageChansLock.Unlock() if !taskInitialized { return MessageNotFoundErrorFn(messageDigest) } @@ -293,9 +293,9 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage } func (mbas *MessageBlsAggregatorService) closeMessageGoroutine(messageDigest aggtypes.MessageDigest) { - mbas.messageChansMutex.Lock() + mbas.messageChansLock.Lock() delete(mbas.signedMessageDigestsCs, messageDigest) - mbas.messageChansMutex.Unlock() + mbas.messageChansLock.Unlock() } func (mbas *MessageBlsAggregatorService) verifySignature( diff --git a/aggregator/message_database.go b/aggregator/message_database.go index 72adf640..74cb424e 100644 --- a/aggregator/message_database.go +++ b/aggregator/message_database.go @@ -14,7 +14,7 @@ import ( type MessageDatabase struct { db *badger.DB dbPath string - mu sync.RWMutex + lock sync.RWMutex } func NewMessageDatabase(dbPath string) (*MessageDatabase, error) { @@ -40,8 +40,8 @@ func (md *MessageDatabase) Close() error { } func (md *MessageDatabase) Store(prefix string, key string, value any) error { - md.mu.Lock() - defer md.mu.Unlock() + md.lock.Lock() + defer md.lock.Unlock() fullKey := prefix + key diff --git a/aggregator/rpc_server.go b/aggregator/rpc_server.go index a749b029..dc0232b3 100644 --- a/aggregator/rpc_server.go +++ b/aggregator/rpc_server.go @@ -66,14 +66,14 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR return err } - agg.taskResponsesMu.Lock() + agg.taskResponsesLock.Lock() if _, ok := agg.taskResponses[taskIndex]; !ok { agg.taskResponses[taskIndex] = make(map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse) } if _, ok := agg.taskResponses[taskIndex][taskResponseDigest]; !ok { agg.taskResponses[taskIndex][taskResponseDigest] = signedCheckpointTaskResponse.TaskResponse } - agg.taskResponsesMu.Unlock() + agg.taskResponsesLock.Unlock() return nil } @@ -102,9 +102,9 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate return err } - agg.stateRootUpdatesMu.Lock() + agg.stateRootUpdatesLock.Lock() agg.stateRootUpdates[messageDigest] = signedStateRootUpdateMessage.Message - agg.stateRootUpdatesMu.Unlock() + agg.stateRootUpdatesLock.Unlock() return nil }