Skip to content
This repository has been archived by the owner on Nov 12, 2023. It is now read-only.

fix concurrency of worker state #22

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions worker/workerState.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type State struct {
Host string
LastSeen int64
requestCounter *RequestCounter
mu sync.Mutex
}

var requestLimits map[int]int
Expand All @@ -33,6 +34,7 @@ func GetWorkerState(workerId string) *State {
if s, found := states[workerId]; !found {
newState := &State{
Uuid: workerId,
LastSeen: time.Now().Unix(),
requestCounter: NewRequestCounter(),
}
newState.SetRequestLimits(requestLimits)
Expand Down Expand Up @@ -99,48 +101,76 @@ func GetWorkers() (results []*State) {
return results
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

// Lock locks the mutex for the State
func (ws *State) Lock() {
ws.mu.Lock()
}

// Unlock unlocks the mutex for the State
func (ws *State) Unlock() {
ws.mu.Unlock()
}

func (ws *State) ResetUsername() {
ws.Lock()
defer ws.Unlock()
ws.Username = ""
}

func (ws *State) ResetAreaAndRoutePart() {
ws.Lock()
defer ws.Unlock()
ws.AreaId = 0
ws.StartStep = 0
ws.EndStep = 0
ws.Step = 0
}

func (ws *State) Touch(host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
}

func (ws *State) LastLocation(lat, lon float64, host string) {
ws.Lock()
defer ws.Unlock()
ws.Host = host
ws.LastSeen = time.Now().Unix()
}

func SetRequestLimits(limits map[int]int) {
requestLimits = limits
}

func (ws *State) SetRequestLimits(limits map[int]int) {
ws.Lock()
defer ws.Unlock()
if len(limits) > 0 {
ws.requestCounter.SetLimits(limits)
}
}

func (ws *State) IncrementLimit(method int) {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.Increment(method)
}

func (ws *State) CheckLimitExceeded() bool {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.CheckLimitsExceeded()
}

func (ws *State) RequestCounts() map[int]int {
ws.Lock()
defer ws.Unlock()
return ws.requestCounter.RequestCounts()
}

func (ws *State) ResetCounter() {
ws.Lock()
defer ws.Unlock()
ws.requestCounter.ResetCounts()
}
9 changes: 4 additions & 5 deletions worker/workerarea.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,11 @@ func (p *WorkerArea) RecalculateRouteParts() {
var activeWorkers []*State
now := time.Now().Unix()
for _, ws := range workersInArea {
if ws.LastSeen == 0 || now-ws.LastSeen <= workerUnseen {
if now-ws.LastSeen <= workerUnseen {
activeWorkers = append(activeWorkers, ws)
} else {
ws.Step = 0
ws.StartStep = 0
ws.EndStep = 0
log.Warnf("[WORKERAREA] Reset route parts of worker %s", ws.Uuid)
ws.ResetAreaAndRoutePart()
}
}

Expand All @@ -227,7 +226,7 @@ func (p *WorkerArea) RecalculateRouteParts() {
}

// Update worker states
ws := workersInArea[i]
ws := activeWorkers[i]
ws.StartStep = startStep
ws.EndStep = endStep
if ws.Step < ws.StartStep || ws.Step > ws.EndStep {
Expand Down