Skip to content
This repository has been archived by the owner on Nov 12, 2023. It is now read-only.

Commit

Permalink
fix concurrency of worker state
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabio1988 committed Sep 26, 2023
1 parent fa046ff commit e7463c1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
38 changes: 34 additions & 4 deletions worker/workerState.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type State struct {
Host string
LastSeen int64
requestCounter *RequestCounter
mu sync.Mutex
}

var requestLimits map[int]int
Expand All @@ -33,6 +34,7 @@ func GetWorkerState(workerId string) *State {
if s, found := states[workerId]; !found {
newState := &State{
Uuid: workerId,
LastSeen: time.Now().Unix(),
requestCounter: NewRequestCounter(),
}
newState.SetRequestLimits(requestLimits)
Expand Down Expand Up @@ -99,48 +101,76 @@ func GetWorkers() (results []*State) {
return results
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

// Lock locks the mutex for the State
func (ws *State) Lock() {
ws.mu.Lock()
}

// Unlock unlocks the mutex for the State
func (ws *State) Unlock() {
ws.mu.Unlock()
}

func (ws *State) ResetUsername() {
ws.Lock()
defer ws.Unlock()
ws.Username = ""
}

func (ws *State) ResetAreaAndRoutePart() {
ws.Lock()
defer ws.Unlock()
ws.AreaId = 0
ws.StartStep = 0
ws.EndStep = 0
ws.Step = 0
}

func (ws *State) Touch(host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
}

func (ws *State) LastLocation(lat, lon float64, host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
ws.LastSeen = time.Now().Unix()
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

func (ws *State) SetRequestLimits(limits map[int]int) {
ws.Lock()
defer ws.Unlock()
if len(limits) > 0 {
ws.requestCounter.SetLimits(limits)
}
}

func (ws *State) IncrementLimit(method int) {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.Increment(method)
}

func (ws *State) CheckLimitExceeded() bool {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.CheckLimitsExceeded()
}

func (ws *State) RequestCounts() map[int]int {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.RequestCounts()
}

func (ws *State) ResetCounter() {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.ResetCounts()
}
6 changes: 2 additions & 4 deletions worker/workerarea.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ func (p *WorkerArea) RecalculateRouteParts() {
var activeWorkers []*State
now := time.Now().Unix()
for _, ws := range workersInArea {
if ws.LastSeen == 0 || now-ws.LastSeen <= workerUnseen {
if now-ws.LastSeen <= workerUnseen {
activeWorkers = append(activeWorkers, ws)
} else {
ws.Step = 0
ws.StartStep = 0
ws.EndStep = 0
ws.ResetAreaAndRoutePart()
}
}

Expand Down

0 comments on commit e7463c1

Please sign in to comment.