Skip to content

Commit

Permalink
feat(boost): invoke parent cancel on output timeout (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Sep 3, 2023
1 parent 7e966db commit b03e627
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 132 deletions.
6 changes: 3 additions & 3 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type (
Error error
}

PoolResultStream = chan PoolResult
PoolResultStreamR = <-chan PoolResult
PoolResultStreamW = chan<- PoolResult
PoolResultStream = chan *PoolResult
PoolResultStreamR = <-chan *PoolResult
PoolResultStreamW = chan<- *PoolResult
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
Expand Down
13 changes: 9 additions & 4 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ const (
)

type (
workerID string
finishedStream = chan workerID
finishedStreamR = <-chan workerID
finishedStreamW = chan<- workerID
workerID string
workerFinishedResult struct {
id workerID
err error
}

finishedStream = chan *workerFinishedResult
finishedStreamR = <-chan *workerFinishedResult
finishedStreamW = chan<- *workerFinishedResult

workerWrapper[I any, O any] struct {
cancelChOut CancelStreamW
Expand Down
109 changes: 71 additions & 38 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"
"runtime"
"time"

"github.com/google/uuid"
"github.com/snivilised/lorax/i18n"
)

// privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner
Expand Down Expand Up @@ -42,21 +42,23 @@ type privateWpInfo[I, O any] struct {
// when all workers have completed their work due to the finished channel, which it also
// owns.
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
exec ExecutiveFunc[I, O]
noWorkers int
sourceJobsChIn JobStream[I]
RoutineName GoRoutineName
WaitAQ AnnotatedWgAQ
ResultInCh PoolResultStreamR
private privateWpInfo[I, O]
outputChTimeout time.Duration
exec ExecutiveFunc[I, O]
noWorkers int
sourceJobsChIn JobStream[I]
RoutineName GoRoutineName
WaitAQ AnnotatedWgAQ
ResultInCh PoolResultStreamR
}

type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, O]
JobsCh JobStream[I]
CancelCh CancelStream
WaitAQ AnnotatedWgAQ
NoWorkers int
OutputChTimeout time.Duration
Exec ExecutiveFunc[I, O]
JobsCh JobStream[I]
CancelCh CancelStream
WaitAQ AnnotatedWgAQ
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
Expand All @@ -74,12 +76,13 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
cancelCh: params.CancelCh,
resultOutCh: resultCh,
},
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
sourceJobsChIn: params.JobsCh,
WaitAQ: params.WaitAQ,
ResultInCh: resultCh,
outputChTimeout: params.OutputChTimeout,
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
sourceJobsChIn: params.JobsCh,
WaitAQ: params.WaitAQ,
ResultInCh: resultCh,
}

return wp
Expand All @@ -101,18 +104,21 @@ func (p *WorkerPool[I, O]) composeID() workerID {

func (p *WorkerPool[I, O]) Start(
parentContext context.Context,
parentCancel context.CancelFunc,
outputsChOut OutputStream[O],
) {
p.run(parentContext, p.private.workersJobsCh, outputsChOut)
p.run(parentContext, parentCancel, p.outputChTimeout, p.private.workersJobsCh, outputsChOut)
}

func (p *WorkerPool[I, O]) run(
parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
forwardChOut JobStreamW[I],
outputsChOut OutputStream[O],
) {
result := PoolResult{}
defer func(r PoolResult) {
result := &PoolResult{}
defer func(r *PoolResult) {
if outputsChOut != nil {
close(outputsChOut)
}
Expand All @@ -121,9 +127,6 @@ func (p *WorkerPool[I, O]) run(
p.WaitAQ.Done(p.RoutineName)
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}(result)

_ = i18n.NewOutputChTimeoutError()

fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", parentContext)

for running := true; running; {
Expand All @@ -141,7 +144,13 @@ func (p *WorkerPool[I, O]) run(
)

if len(p.private.pool) < p.noWorkers {
p.spawn(parentContext, p.private.workersJobsCh, outputsChOut, p.private.finishedCh)
p.spawn(parentContext,
parentCancel,
outputChTimeout,
p.private.workersJobsCh,
outputsChOut,
p.private.finishedCh,
)
}
select {
case forwardChOut <- job:
Expand Down Expand Up @@ -174,15 +183,24 @@ func (p *WorkerPool[I, O]) run(
// don't pass in the context's Done() channel as it already been consumed
// in the run loop, and is now closed.
//
p.drain(p.private.finishedCh)
if err := p.drain(p.private.finishedCh); err != nil {
result.Error = err

fmt.Printf("===> 🧊 WorkerPool.run - drain complete (workers count: '%v'). 🎃🎃🎃\n",
len(p.private.pool),
)
fmt.Printf("===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). 📛📛📛\n",
err,
len(p.private.pool),
)
} else {
fmt.Printf("===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). ☑️☑️☑️\n",
len(p.private.pool),
)
}
}

func (p *WorkerPool[I, O]) spawn(
parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
jobsChIn JobStreamR[I],
outputsChOut OutputStream[O],
finishedChOut finishedStreamW,
Expand All @@ -197,28 +215,30 @@ func (p *WorkerPool[I, O]) spawn(
outputsChOut: outputsChOut,
finishedChOut: finishedChOut,
},
cancelChOut: cancelCh,
cancelChOut: cancelCh, // TODO: this is not used, so delete
}

p.private.pool[w.core.id] = w
go w.core.run(parentContext)
go w.core.run(parentContext, parentCancel, outputChTimeout)
fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀\n", w.core.id)
}

func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) {
func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
fmt.Printf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊 \n",
len(p.private.pool), runtime.NumGoroutine(),
)

var firstError error

for running := true; running; {
// 📍 Here, we don't access the finishedChIn channel in a pre-emptive way via
// the parentContext.Done() channel. This is because in a unit test, we define a timeout as
// part of the test spec using SpecTimeout. When this fires, this is handled by the
// run loop, which ends that loop then enters drain the phase. When this happens,
// you can't reuse that same done channel as it will immediately return the value
// already handled. This has the effect of short-circuiting this loop meaning that
// wid := <-finishedChIn never has a chance to be selected and the drain loop
// workerResult := <-finishedChIn never has a chance to be selected and the drain loop
// exits early. The end result of which means that the p.private.pool collection is
// never depleted.
//
Expand Down Expand Up @@ -250,15 +270,28 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) {
// If a goroutine outlives its context or keeps references to closed Done() channels,
// it might not behave as expected.
//
wid := <-finishedChIn
delete(p.private.pool, wid)
workerResult := <-finishedChIn
delete(p.private.pool, workerResult.id)

if len(p.private.pool) == 0 {
running = false
}

fmt.Printf("!!!! 🧊 WorkerPool.drain - worker(%v) finished, remaining: '%v' 🟥\n",
wid, len(p.private.pool),
if workerResult.err != nil {
fmt.Printf("!!!! 🧊 WorkerPool.drain - worker (%v) 💢💢💢 finished with error: '%v'\n",
workerResult.id,
workerResult.err,
)

if firstError == nil {
firstError = workerResult.err
}
}

fmt.Printf("!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' 🟥\n",
workerResult.err, len(p.private.pool),
)
}

return firstError
}
Loading

0 comments on commit b03e627

Please sign in to comment.