Skip to content

Commit

Permalink
ref(boost): migrate to slog (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jan 4, 2024
1 parent 9f1f304 commit 7cc2929
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 161 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"watchvc",
"watchvi",
"wgan",
"xenomorph"
"xenomorph",
"zapslog"
]
}
26 changes: 21 additions & 5 deletions boost/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package boost

import (
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"

"github.com/samber/lo"
"go.uber.org/zap/exp/zapslog"
"go.uber.org/zap/zapcore"
)

type WaitGroupName string
Expand Down Expand Up @@ -72,6 +75,7 @@ type waitGroupAnImpl struct {
counter int32
names namesCollection
waitGroupName string
logger *slog.Logger
}

func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) {
Expand All @@ -80,7 +84,7 @@ func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) {
if len(name) > 0 {
a.names[name[0]] = "foo"

a.indicate("➕➕➕", string(name[0]), "Add")
a.indicate("💫", string(name[0]), "Add")
}
}

Expand All @@ -90,18 +94,18 @@ func (a *waitGroupAnImpl) Done(name ...GoRoutineName) {
if len(name) > 0 {
delete(a.names, name[0])

a.indicate("🚩🚩🚩", string(name[0]), "Done")
a.indicate("🚩", string(name[0]), "Done")
}
}

func (a *waitGroupAnImpl) Wait(name ...GoRoutineName) {
if len(name) > 0 {
a.indicate("🧭🧭🧭", string(name[0]), "Wait")
a.indicate("🧭", string(name[0]), "Wait")
}
}

func (a *waitGroupAnImpl) indicate(highlight, name, op string) {
Alert(
a.logger.Debug(
fmt.Sprintf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
Expand All @@ -127,11 +131,23 @@ type AnnotatedWaitGroup struct {

// NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing
// the core WaitGroup instance.
func NewAnnotatedWaitGroup(name string) WaitGroupAn {
func NewAnnotatedWaitGroup(name string, log ...*slog.Logger) WaitGroupAn {
logger := lo.TernaryF(len(log) > 0,
func() *slog.Logger {
return log[0]
},
func() *slog.Logger {
return slog.New(zapslog.NewHandler(
zapcore.NewNopCore(), nil),
)
},
)

return &AnnotatedWaitGroup{
assistant: waitGroupAnImpl{
waitGroupName: name,
names: make(namesCollection),
logger: logger,
},
}
}
Expand Down
4 changes: 0 additions & 4 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,3 @@ func NewDuplex[T any](channel chan T) *Duplex[T] {
WriterCh: channel,
}
}

type ActivityCallback func(message string)

var Alert ActivityCallback = func(message string) {}
46 changes: 33 additions & 13 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package boost
import (
"context"
"fmt"
"log/slog"
"runtime"
"time"

"github.com/google/uuid"
"github.com/samber/lo"
"go.uber.org/zap/exp/zapslog"
"go.uber.org/zap/zapcore"
)

// privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner
Expand Down Expand Up @@ -50,6 +54,7 @@ type WorkerPool[I, O any] struct {
RoutineName GoRoutineName
WaitAQ AnnotatedWgAQ
ResultInCh PoolResultStreamR
Logger *slog.Logger
}

type NewWorkerPoolParams[I, O any] struct {
Expand All @@ -59,6 +64,7 @@ type NewWorkerPoolParams[I, O any] struct {
JobsCh JobStream[I]
CancelCh CancelStream
WaitAQ AnnotatedWgAQ
Logger *slog.Logger
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
Expand All @@ -68,6 +74,18 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
}

resultCh := make(PoolResultStream, 1)

logger := lo.TernaryF(params.Logger == nil,
func() *slog.Logger {
return slog.New(zapslog.NewHandler(
zapcore.NewNopCore(), nil),
)
},
func() *slog.Logger {
return params.Logger
},
)

wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
Expand All @@ -83,6 +101,7 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
sourceJobsChIn: params.JobsCh,
WaitAQ: params.WaitAQ,
ResultInCh: resultCh,
Logger: logger,
}

return wp
Expand Down Expand Up @@ -130,9 +149,9 @@ func (p *WorkerPool[I, O]) run(
p.private.resultOutCh <- r

p.WaitAQ.Done(p.RoutineName)
Alert("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
p.Logger.Debug("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}(result)
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run ...(ctx:%+v)\n",
parentContext,
))
Expand All @@ -143,11 +162,11 @@ func (p *WorkerPool[I, O]) run(
running = false

close(forwardChOut) // ⚠️ This is new
Alert("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☢️☢️☢️")
p.Logger.Debug("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☢️☢️☢️")

case job, ok := <-p.sourceJobsChIn:
if ok {
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - new job received",
len(p.private.pool),
))
Expand All @@ -163,7 +182,7 @@ func (p *WorkerPool[I, O]) run(
}
select {
case forwardChOut <- job:
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]",
job.ID,
job.SequenceNo,
Expand All @@ -172,7 +191,7 @@ func (p *WorkerPool[I, O]) run(
running = false

close(forwardChOut) // ⚠️ This is new
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - done received ☢️☢️☢️",
len(p.private.pool),
))
Expand All @@ -185,7 +204,7 @@ func (p *WorkerPool[I, O]) run(
//
running = false
close(forwardChOut)
Alert("===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥")
p.Logger.Debug("===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥")
}
}
}
Expand All @@ -197,13 +216,13 @@ func (p *WorkerPool[I, O]) run(
if err := p.drain(p.private.finishedCh); err != nil {
result.Error = err

Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). 📛📛📛",
err,
len(p.private.pool),
))
} else {
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). ☑️☑️☑️",
len(p.private.pool),
))
Expand All @@ -225,19 +244,20 @@ func (p *WorkerPool[I, O]) spawn(
jobsChIn: jobsChIn,
outputsChOut: outputsChOut,
finishedChOut: finishedChOut,
logger: p.Logger,
},
}

p.private.pool[w.core.id] = w
go w.core.run(parentContext, parentCancel, outputChTimeout)
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀",
w.core.id,
))
}

func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊",
len(p.private.pool), runtime.NumGoroutine(),
))
Expand Down Expand Up @@ -291,7 +311,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
}

if workerResult.err != nil {
Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker (%v) 💢💢💢 finished with error: '%v'",
workerResult.id,
workerResult.err,
Expand All @@ -302,7 +322,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
}
}

Alert(fmt.Sprintf(
p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' 🟥",
workerResult.err, len(p.private.pool),
))
Expand Down
8 changes: 0 additions & 8 deletions boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,7 @@ type poolTE struct {
assert assertFunc
}

func alertPrinter(message string) {
fmt.Println(message)
}

var _ = Describe("WorkerPool", Ordered, func() {
BeforeAll(func() {
boost.Alert = alertPrinter
})

DescribeTable("stream of jobs",
func(specContext SpecContext, entry *poolTE) {
defer leaktest.Check(GinkgoT())()
Expand Down
20 changes: 10 additions & 10 deletions boost/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"
)

Expand All @@ -13,6 +14,7 @@ type worker[I any, O any] struct {
jobsChIn JobStreamR[I]
outputsChOut JobOutputStreamW[O]
finishedChOut finishedStreamW
logger *slog.Logger
}

func (w *worker[I, O]) run(parentContext context.Context,
Expand All @@ -25,26 +27,24 @@ func (w *worker[I, O]) run(parentContext context.Context,
defer func(r *workerFinishedResult) {
w.finishedChOut <- r // ⚠️ non-pre-emptive send, but this should be ok

Alert(fmt.Sprintf(" <--- 🚀 worker.run(%v) (SENT FINISHED - error:'%v'). 🚀🚀🚀",
w.logger.Debug(fmt.Sprintf(" <--- 🚀 worker.run(%v) (SENT FINISHED - error:'%v'). 🚀🚀🚀",
w.id, r.err,
))
}(&result)

Alert(
fmt.Sprintf(" ---> 🚀 worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext),
)
w.logger.Debug(fmt.Sprintf(" ---> 🚀 worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext))

for running := true; running; {
select {
case <-parentContext.Done():
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶", w.id,
))

running = false
case job, ok := <-w.jobsChIn:
if ok {
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(input:'%v')", w.id, job.Input,
))

Expand All @@ -55,7 +55,7 @@ func (w *worker[I, O]) run(parentContext context.Context,
running = false
}
} else {
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(jobs chan closed) 🟥🟥🟥", w.id,
))

Expand All @@ -78,20 +78,20 @@ func (w *worker[I, O]) invoke(parentContext context.Context,
result, _ := w.exec(job)

if w.outputsChOut != nil {
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke ⏰ output timeout: '%v'", outputChTimeout,
))

select {
case w.outputsChOut <- result:

case <-parentContext.Done():
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥", w.id,
))

case <-outputContext.Done():
Alert(fmt.Sprintf(
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke(%v)(cancel) - timeout on send 👿👿👿", w.id,
))

Expand Down
14 changes: 9 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ module github.com/snivilised/lorax
go 1.21

require (
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/onsi/ginkgo/v2 v2.13.2
github.com/onsi/gomega v1.30.0
github.com/samber/lo v1.39.0
github.com/snivilised/extendio v0.5.2
go.uber.org/zap v1.26.0
)

require github.com/fortytw2/leaktest v1.3.0
require (
github.com/fortytw2/leaktest v1.3.0
go.uber.org/zap v1.26.0
go.uber.org/zap/exp v0.2.0
)

require github.com/kr/pretty v0.3.1 // indirect
require (
github.com/kr/pretty v0.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
)

require (
github.com/go-logr/logr v1.3.0 // indirect
Expand All @@ -23,7 +28,6 @@ require (
github.com/google/uuid v1.5.0
github.com/nicksnyder/go-i18n/v2 v2.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
Expand Down
Loading

0 comments on commit 7cc2929

Please sign in to comment.