Skip to content

Commit

Permalink
feat: use anonymous processor funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
fholzer committed Oct 1, 2022
1 parent ce04810 commit 6d7d78c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 44 deletions.
6 changes: 3 additions & 3 deletions heap.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package orderedconcurrently

type processInput struct {
workFn WorkFunction
order uint64
value interface{}
input interface{}
order uint64
value interface{}
}

type processInputHeap []*processInput
Expand Down
41 changes: 24 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,32 @@ type OrderedOutput struct {
}

// WorkFunction interface
type WorkFunction interface {
Run(ctx context.Context) interface{}
}
// type WorkFunction func(ctx context.Context) interface{}
type ProcessFunc func(interface{}) interface{}
type processFuncGenerator func(int) (ProcessFunc, error)

// Process processes work function based on input.
// It Accepts an WorkFunction read channel, work function and concurrent go routine pool size.
// It Returns an interface{} channel.
func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Options) <-chan OrderedOutput {
func Process(ctx context.Context, inputChan <-chan interface{}, processFuncGenerator processFuncGenerator, options *Options) (<-chan OrderedOutput, error) {

outputChan := make(chan OrderedOutput, options.OutChannelBuffer)

go func() {
if options.PoolSize < 1 {
// Set a minimum number of processors
options.PoolSize = 1
if options.PoolSize < 1 {
// Set a minimum number of processors
options.PoolSize = 1
}

processors := make([]ProcessFunc, options.PoolSize)
for i := 0; i < options.PoolSize; i++ {
var err error
processors[i], err = processFuncGenerator(i)
if err != nil {
return nil, err
}
}

go func() {
processChan := make(chan *processInput, options.PoolSize)
aggregatorChan := make(chan *processInput, options.PoolSize)

Expand Down Expand Up @@ -65,16 +75,13 @@ func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Option
poolWg.Add(options.PoolSize)
// Create a goroutine pool
for i := 0; i < options.PoolSize; i++ {
go func(worker int) {
defer func() {
poolWg.Done()
}()
go func(process ProcessFunc) {
defer poolWg.Done()
for input := range processChan {
input.value = input.workFn.Run(ctx)
input.workFn = nil
input.value = process(input.input)
aggregatorChan <- input
}
}(i)
}(processors[i])
}

go func() {
Expand All @@ -88,10 +95,10 @@ func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Option
}()
var order uint64
for input := range inputChan {
processChan <- &processInput{workFn: input, order: order}
processChan <- &processInput{input: input, order: order}
order++
}
}()
}()
return outputChan
return outputChan, nil
}
84 changes: 60 additions & 24 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,34 @@ import (

type zeroLoadWorker int

func (w zeroLoadWorker) Run(ctx context.Context) interface{} {
return w * 2
func zeroLoadWorkerRun(w interface{}) interface{} {
return w.(zeroLoadWorker) * 2
}

type loadWorker int

func (w loadWorker) Run(ctx context.Context) interface{} {
func loadWorkerRun(w interface{}) interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return w * 2
return w.(loadWorker) * 2
}

func processGeneratorGenerator(wf ProcessFunc) processFuncGenerator {
return func(int) (ProcessFunc, error) {
return wf, nil
}
}

func Test1(t *testing.T) {
t.Run("Test with Preset Pool Size", func(t *testing.T) {
ctx := context.Background()
max := 10
inputChan := make(chan WorkFunction)
inputChan := make(chan interface{})
wg := &sync.WaitGroup{}

outChan := Process(ctx, inputChan, &Options{PoolSize: 10})
outChan, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10})
if err != nil {
panic(err)
}
counter := 0
go func(t *testing.T) {
for out := range outChan {
Expand Down Expand Up @@ -61,10 +70,13 @@ func Test2(t *testing.T) {
ctx := context.Background()

max := 10
inputChan := make(chan WorkFunction)
inputChan := make(chan interface{})
wg := &sync.WaitGroup{}

outChan := Process(ctx, inputChan, &Options{OutChannelBuffer: 2})
outChan, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{OutChannelBuffer: 2})
if err != nil {
panic(err)
}
counter := 0
go func(t *testing.T) {
for out := range outChan {
Expand Down Expand Up @@ -96,10 +108,13 @@ func Test3(t *testing.T) {
ctx := context.Background()

max := 10
inputChan := make(chan WorkFunction)
inputChan := make(chan interface{})
wg := &sync.WaitGroup{}

outChan := Process(ctx, inputChan, &Options{OutChannelBuffer: 2})
outChan, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{OutChannelBuffer: 2})
if err != nil {
panic(err)
}
counter := 0
go func(t *testing.T) {
for out := range outChan {
Expand Down Expand Up @@ -132,8 +147,11 @@ func Test4(t *testing.T) {
ctx := context.Background()

max := 10
inputChan := make(chan WorkFunction)
output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- zeroLoadWorker(work)
Expand All @@ -160,8 +178,11 @@ func TestSortedData(t *testing.T) {
ctx := context.Background()

max := 10
inputChan := make(chan WorkFunction)
output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- loadWorker(work)
Expand All @@ -188,8 +209,11 @@ func TestSortedDataMultiple(t *testing.T) {
ctx := context.Background()

max := 10
inputChan := make(chan WorkFunction)
output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(ctx, inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- loadWorker(work)
Expand All @@ -214,8 +238,11 @@ func TestSortedDataMultiple(t *testing.T) {
func TestStreamingInput(t *testing.T) {
t.Run("Test streaming input", func(t *testing.T) {
ctx := context.Background()
inputChan := make(chan WorkFunction, 10)
output := Process(ctx, inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{}, 10)
output, err := Process(ctx, inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}

ticker := time.NewTicker(100 * time.Millisecond)
done := make(chan bool)
Expand Down Expand Up @@ -261,8 +288,11 @@ func TestStreamingInput(t *testing.T) {

func BenchmarkOC(b *testing.B) {
max := 100000
inputChan := make(chan WorkFunction)
output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(context.Background(), inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- zeroLoadWorker(work)
Expand All @@ -276,8 +306,11 @@ func BenchmarkOC(b *testing.B) {

func BenchmarkOCLoad(b *testing.B) {
max := 10
inputChan := make(chan WorkFunction)
output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(context.Background(), inputChan, processGeneratorGenerator(loadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- loadWorker(work)
Expand All @@ -292,8 +325,11 @@ func BenchmarkOCLoad(b *testing.B) {
func BenchmarkOC2(b *testing.B) {
for i := 0; i < 100; i++ {
max := 1000
inputChan := make(chan WorkFunction)
output := Process(context.Background(), inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
inputChan := make(chan interface{})
output, err := Process(context.Background(), inputChan, processGeneratorGenerator(zeroLoadWorkerRun), &Options{PoolSize: 10, OutChannelBuffer: 10})
if err != nil {
panic(err)
}
go func() {
for work := 0; work < max; work++ {
inputChan <- zeroLoadWorker(work)
Expand Down

0 comments on commit 6d7d78c

Please sign in to comment.