Skip to content

Commit

Permalink
refactor: Standardize mutexes naming to *Lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Hyodar committed Feb 6, 2024
1 parent 8fdbe21 commit 8217a48
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
26 changes: 13 additions & 13 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ type Aggregator struct {
taskBlsAggregationService blsagg.BlsAggregationService
messageBlsAggregationService MessageBlsAggregationService
tasks map[types.TaskIndex]taskmanager.CheckpointTask
tasksMu sync.RWMutex
tasksLock sync.RWMutex
taskResponses map[types.TaskIndex]map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse
taskResponsesMu sync.RWMutex
taskResponsesLock sync.RWMutex
msgDb *MessageDatabase
stateRootUpdates map[types.MessageDigest]servicemanager.StateRootUpdateMessage
stateRootUpdatesMu sync.RWMutex
stateRootUpdatesLock sync.RWMutex
}

// NewAggregator creates a new Aggregator with the provided config.
Expand Down Expand Up @@ -214,12 +214,12 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
agg.logger.Info("Threshold reached. Sending aggregated response onchain.",
"taskIndex", blsAggServiceResp.TaskIndex,
)
agg.tasksMu.RLock()
agg.tasksLock.RLock()
task := agg.tasks[blsAggServiceResp.TaskIndex]
agg.tasksMu.RUnlock()
agg.taskResponsesMu.RLock()
agg.tasksLock.RUnlock()
agg.taskResponsesLock.RLock()
taskResponse := agg.taskResponses[blsAggServiceResp.TaskIndex][blsAggServiceResp.TaskResponseDigest]
agg.taskResponsesMu.RUnlock()
agg.taskResponsesLock.RUnlock()
_, err := agg.avsWriter.SendAggregatedResponse(context.Background(), task, taskResponse, nonSignerStakesAndSignature)
if err != nil {
agg.logger.Error("Aggregator failed to respond to task", "err", err)
Expand All @@ -237,9 +237,9 @@ func (agg *Aggregator) sendNewCheckpointTask(fromNearBlock uint64, toNearBlock u
return err
}

agg.tasksMu.Lock()
agg.tasksLock.Lock()
agg.tasks[taskIndex] = newTask
agg.tasksMu.Unlock()
agg.tasksLock.Unlock()

quorumThresholds := make([]uint32, len(newTask.QuorumNumbers))
for i, _ := range newTask.QuorumNumbers {
Expand All @@ -253,14 +253,14 @@ func (agg *Aggregator) sendNewCheckpointTask(fromNearBlock uint64, toNearBlock u
}

func (agg *Aggregator) handleStateRootUpdateReachedQuorum(blsAggServiceResp types.MessageBlsAggregationServiceResponse) {
agg.stateRootUpdatesMu.RLock()
agg.stateRootUpdatesLock.RLock()
msg, ok := agg.stateRootUpdates[blsAggServiceResp.MessageDigest]
agg.stateRootUpdatesMu.RUnlock()
agg.stateRootUpdatesLock.RUnlock()

defer func() {
agg.stateRootUpdatesMu.RLock()
agg.stateRootUpdatesLock.RLock()
delete(agg.stateRootUpdates, blsAggServiceResp.MessageDigest)
agg.stateRootUpdatesMu.RUnlock()
agg.stateRootUpdatesLock.RUnlock()
}()

if !ok {
Expand Down
16 changes: 8 additions & 8 deletions aggregator/message_blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type MessageBlsAggregationService interface {
type MessageBlsAggregatorService struct {
aggregatedResponsesC chan aggtypes.MessageBlsAggregationServiceResponse
signedMessageDigestsCs map[aggtypes.MessageDigest]chan SignedMessageDigest
messageChansMutex sync.RWMutex
messageChansLock sync.RWMutex
avsRegistryService avsregistry.AvsRegistryService
ethClient eth.EthClient
logger logging.Logger
Expand All @@ -95,7 +95,7 @@ func NewMessageBlsAggregatorService(avsRegistryService avsregistry.AvsRegistrySe
return &MessageBlsAggregatorService{
aggregatedResponsesC: make(chan aggtypes.MessageBlsAggregationServiceResponse),
signedMessageDigestsCs: make(map[aggtypes.MessageDigest]chan SignedMessageDigest),
messageChansMutex: sync.RWMutex{},
messageChansLock: sync.RWMutex{},
avsRegistryService: avsRegistryService,
ethClient: ethClient,
logger: logger,
Expand All @@ -117,9 +117,9 @@ func (mbas *MessageBlsAggregatorService) InitializeMessageIfNotExists(
}

signedMessageDigestsC := make(chan SignedMessageDigest)
mbas.messageChansMutex.Lock()
mbas.messageChansLock.Lock()
mbas.signedMessageDigestsCs[messageDigest] = signedMessageDigestsC
mbas.messageChansMutex.Unlock()
mbas.messageChansLock.Unlock()
go mbas.singleMessageAggregatorGoroutineFunc(messageDigest, quorumNumbers, quorumThresholdPercentages, timeToExpiry, signedMessageDigestsC)
return nil
}
Expand All @@ -130,9 +130,9 @@ func (mbas *MessageBlsAggregatorService) ProcessNewSignature(
blsSignature *bls.Signature,
operatorId bls.OperatorId,
) error {
mbas.messageChansMutex.Lock()
mbas.messageChansLock.Lock()
messageC, taskInitialized := mbas.signedMessageDigestsCs[messageDigest]
mbas.messageChansMutex.Unlock()
mbas.messageChansLock.Unlock()
if !taskInitialized {
return MessageNotFoundErrorFn(messageDigest)
}
Expand Down Expand Up @@ -293,9 +293,9 @@ func (mbas *MessageBlsAggregatorService) handleSignedMessageDigest(signedMessage
}

func (mbas *MessageBlsAggregatorService) closeMessageGoroutine(messageDigest aggtypes.MessageDigest) {
mbas.messageChansMutex.Lock()
mbas.messageChansLock.Lock()
delete(mbas.signedMessageDigestsCs, messageDigest)
mbas.messageChansMutex.Unlock()
mbas.messageChansLock.Unlock()
}

func (mbas *MessageBlsAggregatorService) verifySignature(
Expand Down
6 changes: 3 additions & 3 deletions aggregator/message_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type MessageDatabase struct {
db *badger.DB
dbPath string
mu sync.RWMutex
lock sync.RWMutex
}

func NewMessageDatabase(dbPath string) (*MessageDatabase, error) {
Expand All @@ -40,8 +40,8 @@ func (md *MessageDatabase) Close() error {
}

func (md *MessageDatabase) Store(prefix string, key string, value any) error {
md.mu.Lock()
defer md.mu.Unlock()
md.lock.Lock()
defer md.lock.Unlock()

fullKey := prefix + key

Expand Down
8 changes: 4 additions & 4 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ func (agg *Aggregator) ProcessSignedCheckpointTaskResponse(signedCheckpointTaskR
return err
}

agg.taskResponsesMu.Lock()
agg.taskResponsesLock.Lock()
if _, ok := agg.taskResponses[taskIndex]; !ok {
agg.taskResponses[taskIndex] = make(map[sdktypes.TaskResponseDigest]taskmanager.CheckpointTaskResponse)
}
if _, ok := agg.taskResponses[taskIndex][taskResponseDigest]; !ok {
agg.taskResponses[taskIndex][taskResponseDigest] = signedCheckpointTaskResponse.TaskResponse
}
agg.taskResponsesMu.Unlock()
agg.taskResponsesLock.Unlock()

return nil
}
Expand Down Expand Up @@ -102,9 +102,9 @@ func (agg *Aggregator) ProcessSignedStateRootUpdateMessage(signedStateRootUpdate
return err
}

agg.stateRootUpdatesMu.Lock()
agg.stateRootUpdatesLock.Lock()
agg.stateRootUpdates[messageDigest] = signedStateRootUpdateMessage.Message
agg.stateRootUpdatesMu.Unlock()
agg.stateRootUpdatesLock.Unlock()

return nil
}

0 comments on commit 8217a48

Please sign in to comment.