diff --git a/main.go b/main.go index 5235b3d..d7671fe 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) processChan := make(chan *processInput) aggregatorChan := make(chan *processInput) wg := sync.WaitGroup{} - doneChan := make(chan bool) + doneSemaphoreChan := make(chan bool) // Go routine to print data in order go func() { var current uint64 @@ -70,33 +70,26 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) } if aggregatorChan == nil { close(outputChan) - doneChan <- true + doneSemaphoreChan <- true } } }() - closeOnce := sync.Once{} + inputClosedSemaphoreChan := make(chan bool) // Create a goroutine pool for i := 0; i < processors; i++ { go func() { - for { + for input := range processChan { + wg.Add(1) + input.value = wf(input.value) + input.wg = &wg + aggregatorChan <- input select { - case input, ok := <-processChan: - if ok { - input.value = wf(input.value) - wg.Add(1) - input.wg = &wg - aggregatorChan <- input - } else { - processChan = nil - } - } - if processChan == nil { + case <-inputClosedSemaphoreChan: wg.Wait() - // Safe. This will be triggered only once WG has finished - closeOnce.Do(func() { - close(aggregatorChan) - }) + close(aggregatorChan) + default: + continue } } }() @@ -115,10 +108,11 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) } if inputChan == nil { close(processChan) + inputClosedSemaphoreChan <- true break } } - <-doneChan + <-doneSemaphoreChan }() return outputChan }