From e7463c13e34c6b10345b528b658cfa77c7bf09ab Mon Sep 17 00:00:00 2001 From: Fabio1988 Date: Tue, 26 Sep 2023 10:04:12 +0200 Subject: [PATCH 1/2] fix concurrency of worker state --- worker/workerState.go | 38 ++++++++++++++++++++++++++++++++++---- worker/workerarea.go | 6 ++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/worker/workerState.go b/worker/workerState.go index 96364d9..94ce147 100644 --- a/worker/workerState.go +++ b/worker/workerState.go @@ -16,6 +16,7 @@ type State struct { Host string LastSeen int64 requestCounter *RequestCounter + mu sync.Mutex } var requestLimits map[int]int @@ -33,6 +34,7 @@ func GetWorkerState(workerId string) *State { if s, found := states[workerId]; !found { newState := &State{ Uuid: workerId, + LastSeen: time.Now().Unix(), requestCounter: NewRequestCounter(), } newState.SetRequestLimits(requestLimits) @@ -99,11 +101,29 @@ func GetWorkers() (results []*State) { return results } +func SetRequestLimits(limits map[int]int) { + requestLimits = limits +} + +// Lock locks the mutex for the State +func (ws *State) Lock() { + ws.mu.Lock() +} + +// Unlock unlocks the mutex for the State +func (ws *State) Unlock() { + ws.mu.Unlock() +} + func (ws *State) ResetUsername() { + ws.Lock() + defer ws.Unlock() ws.Username = "" } func (ws *State) ResetAreaAndRoutePart() { + ws.Lock() + defer ws.Unlock() ws.AreaId = 0 ws.StartStep = 0 ws.EndStep = 0 @@ -111,36 +131,46 @@ func (ws *State) ResetAreaAndRoutePart() { } func (ws *State) Touch(host string) { + ws.Lock() + defer ws.Unlock() ws.Host = host } func (ws *State) LastLocation(lat, lon float64, host string) { + ws.Lock() + defer ws.Unlock() ws.Host = host ws.LastSeen = time.Now().Unix() } -func SetRequestLimits(limits map[int]int) { - requestLimits = limits -} - func (ws *State) SetRequestLimits(limits map[int]int) { + ws.Lock() + defer ws.Unlock() if len(limits) > 0 { ws.requestCounter.SetLimits(limits) } } func (ws *State) IncrementLimit(method int) { + ws.Lock() + defer ws.Unlock() ws.requestCounter.Increment(method) } func (ws *State) CheckLimitExceeded() bool { + ws.Lock() + defer ws.Unlock() return ws.requestCounter.CheckLimitsExceeded() } func (ws *State) RequestCounts() map[int]int { + ws.Lock() + defer ws.Unlock() return ws.requestCounter.RequestCounts() } func (ws *State) ResetCounter() { + ws.Lock() + defer ws.Unlock() ws.requestCounter.ResetCounts() } diff --git a/worker/workerarea.go b/worker/workerarea.go index 0b97c0e..0bfe805 100644 --- a/worker/workerarea.go +++ b/worker/workerarea.go @@ -196,12 +196,10 @@ func (p *WorkerArea) RecalculateRouteParts() { var activeWorkers []*State now := time.Now().Unix() for _, ws := range workersInArea { - if ws.LastSeen == 0 || now-ws.LastSeen <= workerUnseen { + if now-ws.LastSeen <= workerUnseen { activeWorkers = append(activeWorkers, ws) } else { - ws.Step = 0 - ws.StartStep = 0 - ws.EndStep = 0 + ws.ResetAreaAndRoutePart() } } From b3ba2595b6cfd862b2935dc2c50d824b1c5051a0 Mon Sep 17 00:00:00 2001 From: Fabio1988 Date: Tue, 26 Sep 2023 11:31:04 +0200 Subject: [PATCH 2/2] fix --- worker/workerarea.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/worker/workerarea.go b/worker/workerarea.go index 0bfe805..f36aa0a 100644 --- a/worker/workerarea.go +++ b/worker/workerarea.go @@ -199,6 +199,7 @@ func (p *WorkerArea) RecalculateRouteParts() { if now-ws.LastSeen <= workerUnseen { activeWorkers = append(activeWorkers, ws) } else { + log.Warnf("[WORKERAREA] Reset route parts of worker %s", ws.Uuid) ws.ResetAreaAndRoutePart() } } @@ -225,7 +226,7 @@ func (p *WorkerArea) RecalculateRouteParts() { } // Update worker states - ws := workersInArea[i] + ws := activeWorkers[i] ws.StartStep = startStep ws.EndStep = endStep if ws.Step < ws.StartStep || ws.Step > ws.EndStep {