diff --git a/go.mod b/go.mod index fd8539c..5bc4059 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/tejzpr/ordered-concurrently/v3 +module github.com/fholzer/ordered-concurrently/v3 go 1.12 diff --git a/heap.go b/heap.go index 9592b00..27dfe7e 100644 --- a/heap.go +++ b/heap.go @@ -1,9 +1,9 @@ package orderedconcurrently type processInput struct { - workFn WorkFunction - order uint64 - value interface{} + input interface{} + order uint64 + value interface{} } type processInputHeap []*processInput diff --git a/main.go b/main.go index 7ea1996..1195733 100644 --- a/main.go +++ b/main.go @@ -19,22 +19,32 @@ type OrderedOutput struct { } // WorkFunction interface -type WorkFunction interface { - Run(ctx context.Context) interface{} -} +// type WorkFunction func(ctx context.Context) interface{} +type ProcessFunc func(interface{}) interface{} +type processFuncGenerator func(int) (ProcessFunc, error) // Process processes work function based on input. // It Accepts an WorkFunction read channel, work function and concurrent go routine pool size. // It Returns an interface{} channel. -func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Options) <-chan OrderedOutput { +func Process(ctx context.Context, inputChan <-chan interface{}, processFuncGenerator processFuncGenerator, options *Options) (<-chan OrderedOutput, error) { outputChan := make(chan OrderedOutput, options.OutChannelBuffer) - go func() { - if options.PoolSize < 1 { - // Set a minimum number of processors - options.PoolSize = 1 + if options.PoolSize < 1 { + // Set a minimum number of processors + options.PoolSize = 1 + } + + processors := make([]ProcessFunc, options.PoolSize) + for i := 0; i < options.PoolSize; i++ { + var err error + processors[i], err = processFuncGenerator(i) + if err != nil { + return nil, err } + } + + go func() { processChan := make(chan *processInput, options.PoolSize) aggregatorChan := make(chan *processInput, options.PoolSize) @@ -65,16 +75,13 @@ func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Option poolWg.Add(options.PoolSize) // Create a goroutine pool for i := 0; i < options.PoolSize; i++ { - go func(worker int) { - defer func() { - poolWg.Done() - }() + go func(process ProcessFunc) { + defer poolWg.Done() for input := range processChan { - input.value = input.workFn.Run(ctx) - input.workFn = nil + input.value = process(input.input) aggregatorChan <- input } - }(i) + }(processors[i]) } go func() { @@ -88,10 +95,10 @@ func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Option }() var order uint64 for input := range inputChan { - processChan <- &processInput{workFn: input, order: order} + processChan <- &processInput{input: input, order: order} order++ } }() }() - return outputChan + return outputChan, nil } diff --git a/main_test.go b/main_test.go index 6daa97a..6907168 100644 --- a/main_test.go +++ b/main_test.go @@ -11,25 +11,34 @@ import ( type zeroLoadWorker int -func (w zeroLoadWorker) Run(ctx context.Context) interface{} { - return w * 2 +func zeroLoadWorkerRun(w interface{}) interface{} { + return w.(zeroLoadWorker) * 2 } type loadWorker int -func (w loadWorker) Run(ctx context.Context) interface{} { +func loadWorkerRun(w interface{}) interface{} { time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - return w * 2 + return w.(loadWorker) * 2 +} + +func processGeneratorGenerator(wf ProcessFunc) processFuncGenerator { + return func(int) (ProcessFunc, error) { + return wf, nil + } } func Test1(t *testing.T) { t.Run("Test with Preset Pool Size", func(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) + inputChan := make(chan interface{}) wg := &sync.WaitGroup{} - outChan := Process(ctx, inputChan, &Options{PoolSize: 10}) + outChan, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10}) + if err != nil { + panic(err) + } counter := 0 go func(t *testing.T) { for out := range outChan { @@ -61,10 +70,13 @@ func Test2(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) + inputChan := make(chan interface{}) wg := &sync.WaitGroup{} - outChan := Process(ctx, inputChan, &Options{OutChannelBuffer: 2}) + outChan, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{OutChannelBuffer: 2}) + if err != nil { + panic(err) + } counter := 0 go func(t *testing.T) { for out := range outChan { @@ -96,10 +108,13 @@ func Test3(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) + inputChan := make(chan interface{}) wg := &sync.WaitGroup{} - outChan := Process(ctx, inputChan, &Options{OutChannelBuffer: 2}) + outChan, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{OutChannelBuffer: 2}) + if err != nil { + panic(err) + } counter := 0 go func(t *testing.T) { for out := range outChan { @@ -132,8 +147,11 @@ func Test4(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) - output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- zeroLoadWorker(work) @@ -160,8 +178,11 @@ func TestSortedData(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) - output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- loadWorker(work) @@ -188,8 +209,11 @@ func TestSortedDataMultiple(t *testing.T) { ctx := context.Background() max := 10 - inputChan := make(chan WorkFunction) - output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- loadWorker(work) @@ -214,8 +238,11 @@ func TestSortedDataMultiple(t *testing.T) { func TestStreamingInput(t *testing.T) { t.Run("Test streaming input", func(t *testing.T) { ctx := context.Background() - inputChan := make(chan WorkFunction, 10) - output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}, 10) + output, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } ticker := time.NewTicker(100 * time.Millisecond) done := make(chan bool) @@ -261,8 +288,11 @@ func TestStreamingInput(t *testing.T) { func BenchmarkOC(b *testing.B) { max := 100000 - inputChan := make(chan WorkFunction) - output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(context.Background(), inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- zeroLoadWorker(work) @@ -276,8 +306,11 @@ func BenchmarkOC(b *testing.B) { func BenchmarkOCLoad(b *testing.B) { max := 10 - inputChan := make(chan WorkFunction) - output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(context.Background(), inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- loadWorker(work) @@ -292,8 +325,11 @@ func BenchmarkOCLoad(b *testing.B) { func BenchmarkOC2(b *testing.B) { for i := 0; i < 100; i++ { max := 1000 - inputChan := make(chan WorkFunction) - output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10}) + inputChan := make(chan interface{}) + output, err := Process(context.Background(), inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10}) + if err != nil { + panic(err) + } go func() { for work := 0; work < max; work++ { inputChan <- zeroLoadWorker(work)