Skip to content

Commit

Permalink
Fix: Race condition when Pool size is very large
Browse files Browse the repository at this point in the history
  • Loading branch information
tejzpr committed Mar 11, 2021
1 parent 57bb209 commit 638633e
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
processChan := make(chan *processInput)
aggregatorChan := make(chan *processInput)
wg := sync.WaitGroup{}
doneChan := make(chan bool)
doneSemaphoreChan := make(chan bool)
// Go routine to print data in order
go func() {
var current uint64
Expand Down Expand Up @@ -70,33 +70,26 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
}
if aggregatorChan == nil {
close(outputChan)
doneChan <- true
doneSemaphoreChan <- true
}
}
}()

closeOnce := sync.Once{}
inputClosedSemaphoreChan := make(chan bool)
// Create a goroutine pool
for i := 0; i < processors; i++ {
go func() {
for {
for input := range processChan {
wg.Add(1)
input.value = wf(input.value)
input.wg = &wg
aggregatorChan <- input
select {
case input, ok := <-processChan:
if ok {
input.value = wf(input.value)
wg.Add(1)
input.wg = &wg
aggregatorChan <- input
} else {
processChan = nil
}
}
if processChan == nil {
case <-inputClosedSemaphoreChan:
wg.Wait()
// Safe. This will be triggered only once WG has finished
closeOnce.Do(func() {
close(aggregatorChan)
})
close(aggregatorChan)
default:
continue
}
}
}()
Expand All @@ -115,10 +108,11 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
}
if inputChan == nil {
close(processChan)
inputClosedSemaphoreChan <- true
break
}
}
<-doneChan
<-doneSemaphoreChan
}()
return outputChan
}

0 comments on commit 638633e

Please sign in to comment.