From 7cc2929aed75344d553add800b0cfeadc197117d Mon Sep 17 00:00:00 2001 From: plastikfan Date: Thu, 4 Jan 2024 23:08:51 +0000 Subject: [PATCH] ref(boost): migrate to slog (#92) --- .vscode/settings.json | 3 ++- boost/annotated-wait-group.go | 26 ++++++++++++++++---- boost/boost-public-api.go | 4 --- boost/worker-pool.go | 46 +++++++++++++++++++++++++---------- boost/worker-pool_test.go | 8 ------ boost/worker.go | 20 +++++++-------- go.mod | 14 +++++++---- go.sum | 4 +-- internal/log/fields.go | 21 ---------------- internal/log/log-defs.go | 43 -------------------------------- internal/log/new-logger.go | 37 ---------------------------- log/logger.go | 12 --------- 12 files changed, 77 insertions(+), 161 deletions(-) delete mode 100644 internal/log/fields.go delete mode 100644 internal/log/log-defs.go delete mode 100644 internal/log/new-logger.go delete mode 100644 log/logger.go diff --git a/.vscode/settings.json b/.vscode/settings.json index 0de079e..e21253a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,6 +65,7 @@ "watchvc", "watchvi", "wgan", - "xenomorph" + "xenomorph", + "zapslog" ] } diff --git a/boost/annotated-wait-group.go b/boost/annotated-wait-group.go index 5a19606..bf8532f 100644 --- a/boost/annotated-wait-group.go +++ b/boost/annotated-wait-group.go @@ -2,11 +2,14 @@ package boost import ( "fmt" + "log/slog" "strings" "sync" "sync/atomic" "github.com/samber/lo" + "go.uber.org/zap/exp/zapslog" + "go.uber.org/zap/zapcore" ) type WaitGroupName string @@ -72,6 +75,7 @@ type waitGroupAnImpl struct { counter int32 names namesCollection waitGroupName string + logger *slog.Logger } func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) { @@ -80,7 +84,7 @@ func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) { if len(name) > 0 { a.names[name[0]] = "foo" - a.indicate("βž•βž•βž•", string(name[0]), "Add") + a.indicate("πŸ’«", string(name[0]), "Add") } } @@ -90,18 +94,18 @@ func (a *waitGroupAnImpl) Done(name ...GoRoutineName) { if len(name) > 0 { delete(a.names, name[0]) - a.indicate("🚩🚩🚩", string(name[0]), "Done") + a.indicate("🚩", string(name[0]), "Done") } } func (a *waitGroupAnImpl) Wait(name ...GoRoutineName) { if len(name) > 0 { - a.indicate("🧭🧭🧭", string(name[0]), "Wait") + a.indicate("🧭", string(name[0]), "Wait") } } func (a *waitGroupAnImpl) indicate(highlight, name, op string) { - Alert( + a.logger.Debug( fmt.Sprintf( " %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n", highlight, a.waitGroupName, op, name, a.counter, a.running(), @@ -127,11 +131,23 @@ type AnnotatedWaitGroup struct { // NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing // the core WaitGroup instance. -func NewAnnotatedWaitGroup(name string) WaitGroupAn { +func NewAnnotatedWaitGroup(name string, log ...*slog.Logger) WaitGroupAn { + logger := lo.TernaryF(len(log) > 0, + func() *slog.Logger { + return log[0] + }, + func() *slog.Logger { + return slog.New(zapslog.NewHandler( + zapcore.NewNopCore(), nil), + ) + }, + ) + return &AnnotatedWaitGroup{ assistant: waitGroupAnImpl{ waitGroupName: name, names: make(namesCollection), + logger: logger, }, } } diff --git a/boost/boost-public-api.go b/boost/boost-public-api.go index 25386fe..9ff957d 100644 --- a/boost/boost-public-api.go +++ b/boost/boost-public-api.go @@ -62,7 +62,3 @@ func NewDuplex[T any](channel chan T) *Duplex[T] { WriterCh: channel, } } - -type ActivityCallback func(message string) - -var Alert ActivityCallback = func(message string) {} diff --git a/boost/worker-pool.go b/boost/worker-pool.go index 83004de..e1392a9 100644 --- a/boost/worker-pool.go +++ b/boost/worker-pool.go @@ -3,10 +3,14 @@ package boost import ( "context" "fmt" + "log/slog" "runtime" "time" "github.com/google/uuid" + "github.com/samber/lo" + "go.uber.org/zap/exp/zapslog" + "go.uber.org/zap/zapcore" ) // privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner @@ -50,6 +54,7 @@ type WorkerPool[I, O any] struct { RoutineName GoRoutineName WaitAQ AnnotatedWgAQ ResultInCh PoolResultStreamR + Logger *slog.Logger } type NewWorkerPoolParams[I, O any] struct { @@ -59,6 +64,7 @@ type NewWorkerPoolParams[I, O any] struct { JobsCh JobStream[I] CancelCh CancelStream WaitAQ AnnotatedWgAQ + Logger *slog.Logger } func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] { @@ -68,6 +74,18 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O } resultCh := make(PoolResultStream, 1) + + logger := lo.TernaryF(params.Logger == nil, + func() *slog.Logger { + return slog.New(zapslog.NewHandler( + zapcore.NewNopCore(), nil), + ) + }, + func() *slog.Logger { + return params.Logger + }, + ) + wp := &WorkerPool[I, O]{ private: privateWpInfo[I, O]{ pool: make(workersCollection[I, O], noWorkers), @@ -83,6 +101,7 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O sourceJobsChIn: params.JobsCh, WaitAQ: params.WaitAQ, ResultInCh: resultCh, + Logger: logger, } return wp @@ -130,9 +149,9 @@ func (p *WorkerPool[I, O]) run( p.private.resultOutCh <- r p.WaitAQ.Done(p.RoutineName) - Alert("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") + p.Logger.Debug("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") }(result) - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 WorkerPool.run ...(ctx:%+v)\n", parentContext, )) @@ -143,11 +162,11 @@ func (p *WorkerPool[I, O]) run( running = false close(forwardChOut) // ⚠️ This is new - Alert("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☒️☒️☒️") + p.Logger.Debug("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☒️☒️☒️") case job, ok := <-p.sourceJobsChIn: if ok { - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 (#workers: '%v') WorkerPool.run - new job received", len(p.private.pool), )) @@ -163,7 +182,7 @@ func (p *WorkerPool[I, O]) run( } select { case forwardChOut <- job: - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]", job.ID, job.SequenceNo, @@ -172,7 +191,7 @@ func (p *WorkerPool[I, O]) run( running = false close(forwardChOut) // ⚠️ This is new - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 (#workers: '%v') WorkerPool.run - done received ☒️☒️☒️", len(p.private.pool), )) @@ -185,7 +204,7 @@ func (p *WorkerPool[I, O]) run( // running = false close(forwardChOut) - Alert("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯") + p.Logger.Debug("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯") } } } @@ -197,13 +216,13 @@ func (p *WorkerPool[I, O]) run( if err := p.drain(p.private.finishedCh); err != nil { result.Error = err - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). πŸ“›πŸ“›πŸ“›", err, len(p.private.pool), )) } else { - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). β˜‘οΈβ˜‘οΈβ˜‘οΈ", len(p.private.pool), )) @@ -225,19 +244,20 @@ func (p *WorkerPool[I, O]) spawn( jobsChIn: jobsChIn, outputsChOut: outputsChOut, finishedChOut: finishedChOut, + logger: p.Logger, }, } p.private.pool[w.core.id] = w go w.core.run(parentContext, parentCancel, outputChTimeout) - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "===> 🧊 WorkerPool.spawned new worker: '%v' πŸŽ€πŸŽ€πŸŽ€", w.core.id, )) } func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error { - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊", len(p.private.pool), runtime.NumGoroutine(), )) @@ -291,7 +311,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error { } if workerResult.err != nil { - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "!!!! 🧊 WorkerPool.drain - worker (%v) πŸ’’πŸ’’πŸ’’ finished with error: '%v'", workerResult.id, workerResult.err, @@ -302,7 +322,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error { } } - Alert(fmt.Sprintf( + p.Logger.Debug(fmt.Sprintf( "!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' πŸŸ₯", workerResult.err, len(p.private.pool), )) diff --git a/boost/worker-pool_test.go b/boost/worker-pool_test.go index fb99b9e..039af56 100644 --- a/boost/worker-pool_test.go +++ b/boost/worker-pool_test.go @@ -303,15 +303,7 @@ type poolTE struct { assert assertFunc } -func alertPrinter(message string) { - fmt.Println(message) -} - var _ = Describe("WorkerPool", Ordered, func() { - BeforeAll(func() { - boost.Alert = alertPrinter - }) - DescribeTable("stream of jobs", func(specContext SpecContext, entry *poolTE) { defer leaktest.Check(GinkgoT())() diff --git a/boost/worker.go b/boost/worker.go index 4d20f03..a608dda 100644 --- a/boost/worker.go +++ b/boost/worker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" ) @@ -13,6 +14,7 @@ type worker[I any, O any] struct { jobsChIn JobStreamR[I] outputsChOut JobOutputStreamW[O] finishedChOut finishedStreamW + logger *slog.Logger } func (w *worker[I, O]) run(parentContext context.Context, @@ -25,26 +27,24 @@ func (w *worker[I, O]) run(parentContext context.Context, defer func(r *workerFinishedResult) { w.finishedChOut <- r // ⚠️ non-pre-emptive send, but this should be ok - Alert(fmt.Sprintf(" <--- πŸš€ worker.run(%v) (SENT FINISHED - error:'%v'). πŸš€πŸš€πŸš€", + w.logger.Debug(fmt.Sprintf(" <--- πŸš€ worker.run(%v) (SENT FINISHED - error:'%v'). πŸš€πŸš€πŸš€", w.id, r.err, )) }(&result) - Alert( - fmt.Sprintf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext), - ) + w.logger.Debug(fmt.Sprintf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext)) for running := true; running; { select { case <-parentContext.Done(): - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά", w.id, )) running = false case job, ok := <-w.jobsChIn: if ok { - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.run(%v)(input:'%v')", w.id, job.Input, )) @@ -55,7 +55,7 @@ func (w *worker[I, O]) run(parentContext context.Context, running = false } } else { - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.run(%v)(jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯", w.id, )) @@ -78,7 +78,7 @@ func (w *worker[I, O]) invoke(parentContext context.Context, result, _ := w.exec(job) if w.outputsChOut != nil { - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.invoke ⏰ output timeout: '%v'", outputChTimeout, )) @@ -86,12 +86,12 @@ func (w *worker[I, O]) invoke(parentContext context.Context, case w.outputsChOut <- result: case <-parentContext.Done(): - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯", w.id, )) case <-outputContext.Done(): - Alert(fmt.Sprintf( + w.logger.Debug(fmt.Sprintf( " ---> πŸš€ worker.invoke(%v)(cancel) - timeout on send πŸ‘ΏπŸ‘ΏπŸ‘Ώ", w.id, )) diff --git a/go.mod b/go.mod index 9e48bf9..0ba7b1c 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,22 @@ module github.com/snivilised/lorax go 1.21 require ( - github.com/natefinch/lumberjack v2.0.0+incompatible github.com/onsi/ginkgo/v2 v2.13.2 github.com/onsi/gomega v1.30.0 github.com/samber/lo v1.39.0 github.com/snivilised/extendio v0.5.2 - go.uber.org/zap v1.26.0 ) -require github.com/fortytw2/leaktest v1.3.0 +require ( + github.com/fortytw2/leaktest v1.3.0 + go.uber.org/zap v1.26.0 + go.uber.org/zap/exp v0.2.0 +) -require github.com/kr/pretty v0.3.1 // indirect +require ( + github.com/kr/pretty v0.3.1 // indirect + go.uber.org/multierr v1.11.0 // indirect +) require ( github.com/go-logr/logr v1.3.0 // indirect @@ -23,7 +28,6 @@ require ( github.com/google/uuid v1.5.0 github.com/nicksnyder/go-i18n/v2 v2.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.14.0 // indirect diff --git a/go.sum b/go.sum index 0407652..d0dc264 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= +go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= @@ -71,8 +73,6 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= -gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/log/fields.go b/internal/log/fields.go deleted file mode 100644 index 0658431..0000000 --- a/internal/log/fields.go +++ /dev/null @@ -1,21 +0,0 @@ -package log - -import ( - "go.uber.org/zap" -) - -func String(key, val string) Field { - return zap.String(key, val) -} - -func Uint(key string, val uint) Field { - return zap.Uint(key, val) -} - -func Int(key string, val int) Field { - return zap.Int(key, val) -} - -func Float64(key string, val float64) Field { - return zap.Float64(key, val) -} diff --git a/internal/log/log-defs.go b/internal/log/log-defs.go deleted file mode 100644 index f13629b..0000000 --- a/internal/log/log-defs.go +++ /dev/null @@ -1,43 +0,0 @@ -package log - -import ( - "github.com/snivilised/extendio/xfs/utils" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type Field = zap.Field -type Level = zapcore.Level - -const ( - DebugLevel = zapcore.DebugLevel - InfoLevel = zapcore.InfoLevel - WarnLevel = zapcore.WarnLevel - ErrorLevel = zapcore.ErrorLevel -) - -type Logger interface { - Debug(msg string, fields ...Field) - Info(msg string, fields ...Field) - Warn(msg string, fields ...Field) - Error(msg string, fields ...Field) - Sync() error -} - -type Ref utils.RoProp[Logger] - -type Rotation struct { - Filename string - MaxSizeInMb int - MaxNoOfBackups int - MaxAgeInDays int -} - -type LoggerInfo struct { - Rotation - - Enabled bool - Path string - TimeStampFormat string - Level Level -} diff --git a/internal/log/new-logger.go b/internal/log/new-logger.go deleted file mode 100644 index ce604d9..0000000 --- a/internal/log/new-logger.go +++ /dev/null @@ -1,37 +0,0 @@ -package log - -import ( - "github.com/natefinch/lumberjack" - "github.com/samber/lo" - "github.com/snivilised/extendio/xfs/utils" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - - "github.com/snivilised/extendio/i18n" -) - -func NewLogger(info *LoggerInfo) Ref { - return utils.NewRoProp(lo.TernaryF(info.Enabled, - func() Logger { - if info.Path == "" { - panic(i18n.NewInvalidConfigEntryError(info.Path, "-")) - } - ws := zapcore.AddSync(&lumberjack.Logger{ - Filename: info.Path, - MaxSize: info.Rotation.MaxSizeInMb, - MaxBackups: info.Rotation.MaxNoOfBackups, - MaxAge: info.Rotation.MaxAgeInDays, - }) - config := zap.NewProductionEncoderConfig() - config.EncodeTime = zapcore.TimeEncoderOfLayout(info.TimeStampFormat) - core := zapcore.NewCore( - zapcore.NewJSONEncoder(config), - ws, - info.Level, - ) - return zap.New(core) - }, func() Logger { - return zap.NewNop() - }), - ) -} diff --git a/log/logger.go b/log/logger.go deleted file mode 100644 index 89743c3..0000000 --- a/log/logger.go +++ /dev/null @@ -1,12 +0,0 @@ -package log - -import ( - "github.com/snivilised/lorax/internal/log" -) - -type Logger interface { - Debug(msg string, fields ...log.Field) - Info(msg string, fields ...log.Field) - Warn(msg string, fields ...log.Field) - Error(msg string, fields ...log.Field) -}