diff --git a/boost/annotated-wait-group.go b/boost/annotated-wait-group.go index bf8532f..e4e1e4d 100644 --- a/boost/annotated-wait-group.go +++ b/boost/annotated-wait-group.go @@ -1,7 +1,6 @@ package boost import ( - "fmt" "log/slog" "strings" "sync" @@ -84,7 +83,7 @@ func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) { if len(name) > 0 { a.names[name[0]] = "foo" - a.indicate("πŸ’«", string(name[0]), "Add") + a.indicate(string(name[0]), "Add") } } @@ -94,22 +93,24 @@ func (a *waitGroupAnImpl) Done(name ...GoRoutineName) { if len(name) > 0 { delete(a.names, name[0]) - a.indicate("🚩", string(name[0]), "Done") + a.indicate(string(name[0]), "Done") } } func (a *waitGroupAnImpl) Wait(name ...GoRoutineName) { if len(name) > 0 { - a.indicate("🧭", string(name[0]), "Wait") + a.indicate(string(name[0]), "Wait") } } -func (a *waitGroupAnImpl) indicate(highlight, name, op string) { +func (a *waitGroupAnImpl) indicate(name, op string) { a.logger.Debug( - fmt.Sprintf( - " %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n", - highlight, a.waitGroupName, op, name, a.counter, a.running(), - ), + "WaitGroupAssister", + slog.String("wg-name", a.waitGroupName), + slog.String("op", op), + slog.String("name", name), + slog.Int("counter", int(a.counter)), + slog.String("running", a.running()), ) } diff --git a/boost/worker-pool.go b/boost/worker-pool.go index e1392a9..30cd7ce 100644 --- a/boost/worker-pool.go +++ b/boost/worker-pool.go @@ -149,27 +149,24 @@ func (p *WorkerPool[I, O]) run( p.private.resultOutCh <- r p.WaitAQ.Done(p.RoutineName) - p.Logger.Debug("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") }(result) - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 WorkerPool.run ...(ctx:%+v)\n", - parentContext, - )) for running := true; running; { select { case <-parentContext.Done(): running = false - close(forwardChOut) // ⚠️ This is new - p.Logger.Debug("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☒️☒️☒️") + close(forwardChOut) + p.Logger.Debug("source jobs chan closed - done received", + slog.String("source", "worker-pool.run"), + ) case job, ok := <-p.sourceJobsChIn: if ok { - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 (#workers: '%v') WorkerPool.run - new job received", - len(p.private.pool), - )) + p.Logger.Debug("new job received", + slog.String("source", "worker-pool.run"), + slog.Int("pool size", len(p.private.pool)), + ) if len(p.private.pool) < p.noWorkers { p.spawn(parentContext, @@ -182,29 +179,31 @@ func (p *WorkerPool[I, O]) run( } select { case forwardChOut <- job: - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]", - job.ID, - job.SequenceNo, - )) + p.Logger.Debug("forwarded job", + slog.String("source", "worker-pool.run"), + slog.String("job-id", job.ID), + slog.Int("sequence-no", job.SequenceNo), + ) case <-parentContext.Done(): running = false - close(forwardChOut) // ⚠️ This is new - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 (#workers: '%v') WorkerPool.run - done received ☒️☒️☒️", - len(p.private.pool), - )) + close(forwardChOut) + p.Logger.Debug("done received", + slog.String("source", "worker-pool.run"), + slog.Int("pool size", len(p.private.pool)), + ) } } else { - // ⚠️ This close is essential. Since the pool acts as a bridge between + // πŸ’« This close is essential. Since the pool acts as a bridge between // 2 channels (p.sourceJobsChIn and p.private.workersJobsCh/forwardChOut), // when the producer closes p.sourceJobsChIn, we need to delegate that // closure to forwardChOut, otherwise we end up in a deadlock. // running = false close(forwardChOut) - p.Logger.Debug("===> πŸš€ WorkerPool.run(source jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯") + p.Logger.Debug("source jobs chan closed", + slog.String("source", "worker-pool.run"), + ) } } } @@ -216,16 +215,16 @@ func (p *WorkerPool[I, O]) run( if err := p.drain(p.private.finishedCh); err != nil { result.Error = err - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). πŸ“›πŸ“›πŸ“›", - err, - len(p.private.pool), - )) + p.Logger.Error("drain complete with error", + slog.String("source", "worker-pool.run"), + slog.Int("pool size", len(p.private.pool)), + slog.String("error", err.Error()), + ) } else { - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). β˜‘οΈβ˜‘οΈβ˜‘οΈ", - len(p.private.pool), - )) + p.Logger.Debug("drain complete OK", + slog.String("source", "worker-pool.run"), + slog.Int("pool size", len(p.private.pool)), + ) } } @@ -250,17 +249,17 @@ func (p *WorkerPool[I, O]) spawn( p.private.pool[w.core.id] = w go w.core.run(parentContext, parentCancel, outputChTimeout) - p.Logger.Debug(fmt.Sprintf( - "===> 🧊 WorkerPool.spawned new worker: '%v' πŸŽ€πŸŽ€πŸŽ€", - w.core.id, - )) + p.Logger.Debug("spawned new worker", + slog.String("source", "WorkerPool.spawn"), + slog.String("worker-id", string(w.core.id)), + ) } func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error { - p.Logger.Debug(fmt.Sprintf( - "!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊", - len(p.private.pool), runtime.NumGoroutine(), - )) + p.Logger.Debug("waiting for remaining workers...", + slog.String("source", "WorkerPool.drain"), + slog.Int("pool size", len(p.private.pool)), + ) var firstError error @@ -311,21 +310,21 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error { } if workerResult.err != nil { - p.Logger.Debug(fmt.Sprintf( - "!!!! 🧊 WorkerPool.drain - worker (%v) πŸ’’πŸ’’πŸ’’ finished with error: '%v'", - workerResult.id, - workerResult.err, - )) + p.Logger.Error("worker finished with error", + slog.String("source", "WorkerPool.drain"), + slog.String("result-id", string(workerResult.id)), + slog.String("error", workerResult.err.Error()), + ) if firstError == nil { firstError = workerResult.err } } - p.Logger.Debug(fmt.Sprintf( - "!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' πŸŸ₯", - workerResult.err, len(p.private.pool), - )) + p.Logger.Debug("worker pool finished", + slog.String("source", "WorkerPool.drain"), + slog.Int("remaining", len(p.private.pool)), + ) } return firstError diff --git a/boost/worker.go b/boost/worker.go index a608dda..e1a9ead 100644 --- a/boost/worker.go +++ b/boost/worker.go @@ -25,28 +25,30 @@ func (w *worker[I, O]) run(parentContext context.Context, id: w.id, } defer func(r *workerFinishedResult) { - w.finishedChOut <- r // ⚠️ non-pre-emptive send, but this should be ok + w.finishedChOut <- r // πŸ’« non-pre-emptive send, but this should be ok - w.logger.Debug(fmt.Sprintf(" <--- πŸš€ worker.run(%v) (SENT FINISHED - error:'%v'). πŸš€πŸš€πŸš€", - w.id, r.err, - )) + w.logger.Debug("send complete", + slog.String("source", "worker.run"), + slog.String("worker-id", string(w.id)), + ) }(&result) - w.logger.Debug(fmt.Sprintf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext)) - for running := true; running; { select { case <-parentContext.Done(): - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά", w.id, - )) + w.logger.Debug("finished - done received", + slog.String("source", "worker.run"), + slog.String("worker-id", string(w.id)), + ) running = false case job, ok := <-w.jobsChIn: if ok { - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.run(%v)(input:'%v')", w.id, job.Input, - )) + w.logger.Debug("read from channel", + slog.String("source", "worker.run"), + slog.String("worker-id", string(w.id)), + slog.String("job-input", fmt.Sprintf("%v", w.id)), + ) err := w.invoke(parentContext, parentCancel, outputChTimeout, job) @@ -55,9 +57,10 @@ func (w *worker[I, O]) run(parentContext context.Context, running = false } } else { - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.run(%v)(jobs chan closed) πŸŸ₯πŸŸ₯πŸŸ₯", w.id, - )) + w.logger.Debug("jobs chan closed", + slog.String("source", "worker.run"), + slog.String("worker-id", string(w.id)), + ) running = false } @@ -78,22 +81,25 @@ func (w *worker[I, O]) invoke(parentContext context.Context, result, _ := w.exec(job) if w.outputsChOut != nil { - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.invoke ⏰ output timeout: '%v'", outputChTimeout, - )) + w.logger.Debug("output timeout", + slog.String("source", "Worker.invoke"), + slog.String("output-channel-timeout", outputChTimeout.String()), + ) select { case w.outputsChOut <- result: case <-parentContext.Done(): - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯", w.id, - )) + w.logger.Debug("done received", + slog.String("source", "Worker.invoke"), + slog.String("worker-id", string(w.id)), + ) case <-outputContext.Done(): - w.logger.Debug(fmt.Sprintf( - " ---> πŸš€ worker.invoke(%v)(cancel) - timeout on send πŸ‘ΏπŸ‘ΏπŸ‘Ώ", w.id, - )) + w.logger.Debug("cancel - timeout on send", + slog.String("source", "Worker.invoke"), + slog.String("worker-id", string(w.id)), + ) // ??? err = i18n.NewOutputChTimeoutError() err = errors.New("timeout on send")