Skip to content

Commit

Permalink
Merge pull request #23 from practo/bug-approxmsgs
Browse files Browse the repository at this point in the history
Fixes for the poller race and sync stopping
  • Loading branch information
alok87 authored Aug 14, 2019
2 parents 9197391 + 5faec23 commit 35734d7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
2 changes: 1 addition & 1 deletion artifacts/deployment-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spec:
value: {{ WPA_AWS_ACCESS_KEY_ID }}
- name: AWS_SECRET_ACCESS_KEY
value: {{ WPA_AWS_SECRET_ACCESS_KEY }}
image: practodev/workerpodautoscaler:v0.1.2
image: practodev/workerpodautoscaler:v0.1.3
imagePullPolicy: Always
command:
- /workerpodautoscaler
Expand Down
19 changes: 11 additions & 8 deletions pkg/queue/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *Poller) isThreadRequired(key string) bool {
if _, ok := threads[key]; !ok {
return false
}
return true
return threads[key]
}

func (p *Poller) runPollThread(key string) {
Expand Down Expand Up @@ -63,12 +63,11 @@ func (p *Poller) sync(stopCh <-chan struct{}) {
for {
select {
case listResultCh := <-p.listThreadCh:
listResultCh <- p.threads
listResultCh <- DeepCopyThread(p.threads)
case threadStatus := <-p.updateThreadCh:
for key, status := range threadStatus {
if status == false {
delete(p.threads, key)
return
}
p.threads[key] = status
}
Expand All @@ -82,8 +81,6 @@ func (p *Poller) sync(stopCh <-chan struct{}) {
func (p *Poller) Run(stopCh <-chan struct{}) {
go p.sync(stopCh)

klog.Info("Starting sqs poller(s) and thread manager.")

ticker := time.NewTicker(time.Second * 1)
for {
select {
Expand All @@ -92,8 +89,7 @@ func (p *Poller) Run(stopCh <-chan struct{}) {
// Create a new thread
for key, _ := range queues {
threads := p.listThreads()
_, ok := threads[key]
if !ok {
if _, ok := threads[key]; !ok {
p.updateThreads(key, true)
go p.runPollThread(key)
}
Expand All @@ -105,10 +101,17 @@ func (p *Poller) Run(stopCh <-chan struct{}) {
p.updateThreads(key, false)
}
}

case <-stopCh:
klog.Info("Stopping sqs poller(s) and thread manager gracefully.")
return
}
}
}

func DeepCopyThread(original map[string]bool) map[string]bool {
copy := make(map[string]bool)
for key, value := range original {
copy[key] = value
}
return copy
}

0 comments on commit 35734d7

Please sign in to comment.