Skip to content

Commit

Permalink
ref(async): redesign the pool exec interface (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 11, 2023
1 parent a4f2c01 commit 1935e00
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 144 deletions.
11 changes: 4 additions & 7 deletions async/pool-defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Job[I any] struct {
Input I
}

type Executive[I, R any] interface {
Invoke(j Job[I]) (JobResult[R], error)
type ExecutiveFunc[I, R any] func(j Job[I]) (JobResult[R], error)

func (f ExecutiveFunc[I, R]) Invoke(j Job[I]) (JobResult[R], error) {
return f(j)
}

type JobResult[R any] struct {
Expand All @@ -44,8 +46,3 @@ type WorkerID string
type FinishedStream = chan WorkerID
type FinishedStreamIn = <-chan WorkerID
type FinishedStreamOut = chan<- WorkerID

// joinChannelsFunc allows reader channel to be joined to the writer channel. This
// function is called when entry is found on the input and forwarded to the
// output.
type JoinChannelsFunc[T any] func(inCh <-chan T, outCh chan<- T)
128 changes: 6 additions & 122 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,122 +9,6 @@ import (
"github.com/google/uuid"
)

/*
ref: https://levelup.gitconnected.com/how-to-use-context-to-manage-your-goroutines-like-a-boss-ef1e478919e6
func main() {
// Create a new context.
parent, cancelParent := context.WithCancel(context.Background())
// Derive child contexts from parent.
childA, _ := context.WithTimeout(parent, 5 * time.Second)
childB, _ := context.WithDeadline(parent, time.Now().Add(1 * time.Minute)
go func() {
<-childA.Done()
<-childB.Done()
fmt.Println("All children are done")
}()
// Cancel parent make all children are cancelled.
cancelParent()
}
// -> Result: All children are done
* context.WithCancel(parentContext) creates a new context which completes when
the returned cancel function is called or when the parent's context finishes,
whichever happens first.
* context.WithTimeout(contextContext, 5 * time.Second) creates a new context
which finishes when the returned cancel function is called or when it exceeds
timeout or when the parent's context finishes, whichever happens first.
* context.WithDeadline(parentContext, time.Now().Add(1 * time.Minute) creates a
new context which finishes when the returned cancel function deadline expires
or when the parent's context completes, whichever happens first.
See also: https://pkg.go.dev/context#example_WithCancel
: https://go.dev/blog/pprof
: https://levelup.gitconnected.com/how-to-use-context-to-manage-your-goroutines-like-a-boss-ef1e478919e6
: https://blog.logrocket.com/functional-programming-in-go/
*/

/*
Channels in play:
- jobs (input)
- results (output)
- errors (output)
- cancel (signal)
- done (signals no more new work)
The effect we want to create is similar to the design of io_uring
in linux
We want the main thread to perform a close on the jobs channel when
there no more work required. This closure should not interrupt the
execution of the existing workload.
The question is, do we want to use a new GR to send jobs to the pool
or do we want this to be blocking for the main GR?
1) If we have a dedicated dispatcher GR, then that implies the main thread
could freely submit all jobs it can find without being throttled. The downside
of this is that we could have a large build up of outstanding jobs resulting
in higher memory consumption. We would need a way to wait for all jobs to be
completed, ie when there are no more workers. This could be achieved with
a wait group. The problem with a wait group is that we could accidentally
reach 0 in the wait group even though there are still jobs outstanding. This is
a race condition which would arise because a job in the queue is not taken up
before all existing workers have exited. We could alleviate this by adding
an extra entry into the wait group but then how do you get down to 0?
2) Main GR sends jobs into a buffered channel and blocks when full. This seems
like the more sensible option. The main GR would be throttled by the number of
active workers and the job queue would not grow excessively consuming
memory judiciously.
However, if we have a results channel that must be read from, then we can't
have the main GR limited by the size of the worker pool, because if we do, we'll
still suffer frm the problem of memory build up, but this would be a build up
on the output, ie of the results channel.
GR(main) --> jobsCh: this is blocking after channel is full
3)
ProducerGR(observable):
- writes to job channel
PoolGR(workers):
- reads from job channel
ConsumerGR(observer):
- reads from results channel
- reads from errors channel
Both the Producer and the Consumer should be started up immediately as
separate GRs, distinct from the main GR.
* ProducerGR(observable) --> owns the job channel and should be free to close it
when no more work is available.
* PoolGR(workers) --> the pool owns the output channels
So, the next question is, how does the pool know when to close the output channels?
In theory, this should be when the jobs queue is empty and the current pool of
workers is empty. This realisation now makes us discover what the worker is. The
worker is effectively a handle to the go routine which is stored in a scoped collection.
Operations on this collection should be done via a channel, where we send a pointer
to the collection thru the channel. This collection should probably be a map, whose key
is a uniquely generated ID (see "github.com/google/uuid"). When the map is empty, we
know there are no more workers active to send to the outputs, therefore we can close them.
---
- To signal an event without sending data, we can use sync.Cond. (See
page 53. ). We could use Cond to signal no more work and no more results.
- The results channel must be optional, because a client may define work in which a result
is of no value. In this case, the pool must decide how to define closure. Perhaps it creates
a dummy consumer.
*/

// privateWpInfo contains any state that needs to be mutated in a non concurrent manner
// and therefore should be exclusively accessed by a single go routine. Actually, due to
// our ability to compose functionality with channels as opposed to shared state, the
Expand Down Expand Up @@ -158,16 +42,16 @@ type privateWpInfo[I, R any] struct {
// owns.
type WorkerPool[I, R any] struct {
private privateWpInfo[I, R]
fn Executive[I, R]
exec ExecutiveFunc[I, R]
noWorkers int
SourceJobsChIn <-chan Job[I]
SourceJobsChIn JobStreamIn[I]

Quit *sync.WaitGroup
}

type NewWorkerPoolParams[I, R any] struct {
NoWorkers int
Exec Executive[I, R]
Exec ExecutiveFunc[I, R]
JobsCh chan Job[I]
CancelCh CancelStream
Quit *sync.WaitGroup
Expand All @@ -186,7 +70,7 @@ func NewWorkerPool[I, R any](params *NewWorkerPoolParams[I, R]) *WorkerPool[I, R
finishedCh: make(FinishedStream, noWorkers),
cancelCh: params.CancelCh,
},
fn: params.Exec,
exec: params.Exec,
noWorkers: noWorkers,
SourceJobsChIn: params.JobsCh,

Expand Down Expand Up @@ -279,7 +163,7 @@ func (p *WorkerPool[I, R]) run(

func (p *WorkerPool[I, R]) spawn(
ctx context.Context,
jobsInCh <-chan Job[I],
jobsInCh JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
) {
Expand All @@ -288,7 +172,7 @@ func (p *WorkerPool[I, R]) spawn(
w := &workerWrapper[I, R]{
core: &worker[I, R]{
id: p.composeID(),
fn: p.fn,
exec: p.exec,
jobsInCh: jobsInCh,
resultsOutCh: resultsChOut,
finishedChOut: finishedChOut,
Expand Down
9 changes: 3 additions & 6 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ func (i TestJobInput) SequenceNo() int {
type TestJobResult = string
type TestResultStream chan async.JobResult[TestJobResult]

type exec struct {
}

func (e *exec) Invoke(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], error) {
var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], error) {
r := rand.Intn(1000) + 1 //nolint:gosec // trivial
delay := time.Millisecond * time.Duration(r)
time.Sleep(delay)
Expand Down Expand Up @@ -101,7 +98,7 @@ func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.Pro
p.wg.Add(1)
}

func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executive[I, R]) {
func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
NoWorkers: 5,
Expand Down Expand Up @@ -152,7 +149,7 @@ var _ = Describe("WorkerPool", func() {
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.startPool(ctx, &exec{})
pipe.startPool(ctx, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.startConsumer(ctx)
Expand Down
14 changes: 5 additions & 9 deletions async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ import (
)

type worker[I any, R any] struct {
id WorkerID

// TODO: there is still no benefit on using an interface rather than a function,
// might have to change this back to a function
//
fn Executive[I, R]
jobsInCh <-chan Job[I]
id WorkerID
exec ExecutiveFunc[I, R]
jobsInCh JobStreamIn[I]
resultsOutCh ResultStreamOut[R]
finishedChOut FinishedStreamOut

// this might be better replaced with a broadcast mechanism such as sync.Cond
//
cancelChIn <-chan CancelWorkSignal
cancelChIn CancelStreamIn
}

func (w *worker[I, R]) run(ctx context.Context) {
Expand Down Expand Up @@ -47,7 +43,7 @@ func (w *worker[I, R]) run(ctx context.Context) {
}

func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) {
result, _ := w.fn.Invoke(job)
result, _ := w.exec(job)

select {
case <-ctx.Done():
Expand Down

0 comments on commit 1935e00

Please sign in to comment.