Skip to content

Commit

Permalink
feat(boost): return a pool result (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Sep 3, 2023
1 parent cf56b0a commit 7e966db
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 112 deletions.
8 changes: 8 additions & 0 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ type (
CancelStream = chan CancelWorkSignal
CancelStreamR = <-chan CancelWorkSignal
CancelStreamW = chan<- CancelWorkSignal

PoolResult struct {
Error error
}

PoolResultStream = chan PoolResult
PoolResultStreamR = <-chan PoolResult
PoolResultStreamW = chan<- PoolResult
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
Expand Down
40 changes: 25 additions & 15 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime"

"github.com/google/uuid"
"github.com/snivilised/lorax/i18n"
)

// privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner
Expand Down Expand Up @@ -34,6 +35,7 @@ type privateWpInfo[I, O any] struct {
workersJobsCh chan Job[I]
finishedCh finishedStream
cancelCh CancelStream
resultOutCh PoolResultStreamW
}

// WorkerPool owns the resultOut channel, because it is the only entity that knows
Expand All @@ -46,6 +48,7 @@ type WorkerPool[I, O any] struct {
sourceJobsChIn JobStream[I]
RoutineName GoRoutineName
WaitAQ AnnotatedWgAQ
ResultInCh PoolResultStreamR
}

type NewWorkerPoolParams[I, O any] struct {
Expand All @@ -62,19 +65,21 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
noWorkers = params.NoWorkers
}

resultCh := make(PoolResultStream, 1)
wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
workersJobsCh: make(JobStream[I], noWorkers),
finishedCh: make(finishedStream, noWorkers),
cancelCh: params.CancelCh,
resultOutCh: resultCh,
},
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
sourceJobsChIn: params.JobsCh,

WaitAQ: params.WaitAQ,
WaitAQ: params.WaitAQ,
ResultInCh: resultCh,
}

return wp
Expand All @@ -95,30 +100,35 @@ func (p *WorkerPool[I, O]) composeID() workerID {
}

func (p *WorkerPool[I, O]) Start(
ctx context.Context,
parentContext context.Context,
outputsChOut OutputStream[O],
) {
p.run(ctx, p.private.workersJobsCh, outputsChOut)
p.run(parentContext, p.private.workersJobsCh, outputsChOut)
}

func (p *WorkerPool[I, O]) run(
ctx context.Context,
parentContext context.Context,
forwardChOut JobStreamW[I],
outputsChOut OutputStream[O],
) {
defer func() {
result := PoolResult{}
defer func(r PoolResult) {
if outputsChOut != nil {
close(outputsChOut)
}
p.private.resultOutCh <- r

p.WaitAQ.Done(p.RoutineName)
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)
}(result)

_ = i18n.NewOutputChTimeoutError()

fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", parentContext)

for running := true; running; {
select {
case <-ctx.Done():
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
Expand All @@ -131,15 +141,15 @@ func (p *WorkerPool[I, O]) run(
)

if len(p.private.pool) < p.noWorkers {
p.spawn(ctx, p.private.workersJobsCh, outputsChOut, p.private.finishedCh)
p.spawn(parentContext, p.private.workersJobsCh, outputsChOut, p.private.finishedCh)
}
select {
case forwardChOut <- job:
fmt.Printf("===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]\n",
job.ID,
job.SequenceNo,
)
case <-ctx.Done():
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
Expand Down Expand Up @@ -172,7 +182,7 @@ func (p *WorkerPool[I, O]) run(
}

func (p *WorkerPool[I, O]) spawn(
ctx context.Context,
parentContext context.Context,
jobsChIn JobStreamR[I],
outputsChOut OutputStream[O],
finishedChOut finishedStreamW,
Expand All @@ -191,7 +201,7 @@ func (p *WorkerPool[I, O]) spawn(
}

p.private.pool[w.core.id] = w
go w.core.run(ctx)
go w.core.run(parentContext)
fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀\n", w.core.id)
}

Expand All @@ -203,7 +213,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) {

for running := true; running; {
// 📍 Here, we don't access the finishedChIn channel in a pre-emptive way via
// the ctx.Done() channel. This is because in a unit test, we define a timeout as
// the parentContext.Done() channel. This is because in a unit test, we define a timeout as
// part of the test spec using SpecTimeout. When this fires, this is handled by the
// run loop, which ends that loop then enters drain the phase. When this happens,
// you can't reuse that same done channel as it will immediately return the value
Expand All @@ -212,7 +222,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) {
// exits early. The end result of which means that the p.private.pool collection is
// never depleted.
//
// ⚠️ So an important lesson to be learnt here is that once a ctx.Done() has fired,
// ⚠️ So an important lesson to be learnt here is that once a parentContext.Done() has fired,
// you can't reuse tha same channel in another select statement as it will simply
// return immediately, bypassing all the others cases in the select statement.
//
Expand Down
Loading

0 comments on commit 7e966db

Please sign in to comment.