Skip to content
This repository has been archived by the owner on Nov 12, 2023. It is now read-only.

Commit

Permalink
fix concurrency of worker state (#22)
Browse files Browse the repository at this point in the history
* fix concurrency of worker state

* fix
  • Loading branch information
Fabio1988 authored Sep 26, 2023
1 parent fa046ff commit 64e3abc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
38 changes: 34 additions & 4 deletions worker/workerState.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type State struct {
Host string
LastSeen int64
requestCounter *RequestCounter
mu sync.Mutex
}

var requestLimits map[int]int
Expand All @@ -33,6 +34,7 @@ func GetWorkerState(workerId string) *State {
if s, found := states[workerId]; !found {
newState := &State{
Uuid: workerId,
LastSeen: time.Now().Unix(),
requestCounter: NewRequestCounter(),
}
newState.SetRequestLimits(requestLimits)
Expand Down Expand Up @@ -99,48 +101,76 @@ func GetWorkers() (results []*State) {
return results
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

// Lock locks the mutex for the State
func (ws *State) Lock() {
ws.mu.Lock()
}

// Unlock unlocks the mutex for the State
func (ws *State) Unlock() {
ws.mu.Unlock()
}

func (ws *State) ResetUsername() {
ws.Lock()
defer ws.Unlock()
ws.Username = ""
}

func (ws *State) ResetAreaAndRoutePart() {
ws.Lock()
defer ws.Unlock()
ws.AreaId = 0
ws.StartStep = 0
ws.EndStep = 0
ws.Step = 0
}

func (ws *State) Touch(host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
}

func (ws *State) LastLocation(lat, lon float64, host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
ws.LastSeen = time.Now().Unix()
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

func (ws *State) SetRequestLimits(limits map[int]int) {
ws.Lock()
defer ws.Unlock()
if len(limits) > 0 {
ws.requestCounter.SetLimits(limits)
}
}

func (ws *State) IncrementLimit(method int) {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.Increment(method)
}

func (ws *State) CheckLimitExceeded() bool {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.CheckLimitsExceeded()
}

func (ws *State) RequestCounts() map[int]int {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.RequestCounts()
}

func (ws *State) ResetCounter() {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.ResetCounts()
}
9 changes: 4 additions & 5 deletions worker/workerarea.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,11 @@ func (p *WorkerArea) RecalculateRouteParts() {
var activeWorkers []*State
now := time.Now().Unix()
for _, ws := range workersInArea {
if ws.LastSeen == 0 || now-ws.LastSeen <= workerUnseen {
if now-ws.LastSeen <= workerUnseen {
activeWorkers = append(activeWorkers, ws)
} else {
ws.Step = 0
ws.StartStep = 0
ws.EndStep = 0
log.Warnf("[WORKERAREA] Reset route parts of worker %s", ws.Uuid)
ws.ResetAreaAndRoutePart()
}
}

Expand All @@ -227,7 +226,7 @@ func (p *WorkerArea) RecalculateRouteParts() {
}

// Update worker states
ws := workersInArea[i]
ws := activeWorkers[i]
ws.StartStep = startStep
ws.EndStep = endStep
if ws.Step < ws.StartStep || ws.Step > ws.EndStep {
Expand Down

0 comments on commit 64e3abc

Please sign in to comment.