Skip to content

Commit

Permalink
ref(boost): improve structured logging (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jan 5, 2024
1 parent 5880278 commit f7c8ebf
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 82 deletions.
19 changes: 10 additions & 9 deletions boost/annotated-wait-group.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package boost

import (
"fmt"
"log/slog"
"strings"
"sync"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (a *waitGroupAnImpl) Add(delta int, name ...GoRoutineName) {
if len(name) > 0 {
a.names[name[0]] = "foo"

a.indicate("💫", string(name[0]), "Add")
a.indicate(string(name[0]), "Add")
}
}

Expand All @@ -94,22 +93,24 @@ func (a *waitGroupAnImpl) Done(name ...GoRoutineName) {
if len(name) > 0 {
delete(a.names, name[0])

a.indicate("🚩", string(name[0]), "Done")
a.indicate(string(name[0]), "Done")
}
}

func (a *waitGroupAnImpl) Wait(name ...GoRoutineName) {
if len(name) > 0 {
a.indicate("🧭", string(name[0]), "Wait")
a.indicate(string(name[0]), "Wait")
}
}

func (a *waitGroupAnImpl) indicate(highlight, name, op string) {
func (a *waitGroupAnImpl) indicate(name, op string) {
a.logger.Debug(
fmt.Sprintf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
),
"WaitGroupAssister",
slog.String("wg-name", a.waitGroupName),
slog.String("op", op),
slog.String("name", name),
slog.Int("counter", int(a.counter)),
slog.String("running", a.running()),
)
}

Expand Down
97 changes: 48 additions & 49 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,27 +149,24 @@ func (p *WorkerPool[I, O]) run(
p.private.resultOutCh <- r

p.WaitAQ.Done(p.RoutineName)
p.Logger.Debug("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}(result)
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run ...(ctx:%+v)\n",
parentContext,
))

for running := true; running; {
select {
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
p.Logger.Debug("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☢️☢️☢️")
close(forwardChOut)
p.Logger.Debug("source jobs chan closed - done received",
slog.String("source", "worker-pool.run"),
)

case job, ok := <-p.sourceJobsChIn:
if ok {
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - new job received",
len(p.private.pool),
))
p.Logger.Debug("new job received",
slog.String("source", "worker-pool.run"),
slog.Int("pool size", len(p.private.pool)),
)

if len(p.private.pool) < p.noWorkers {
p.spawn(parentContext,
Expand All @@ -182,29 +179,31 @@ func (p *WorkerPool[I, O]) run(
}
select {
case forwardChOut <- job:
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]",
job.ID,
job.SequenceNo,
))
p.Logger.Debug("forwarded job",
slog.String("source", "worker-pool.run"),
slog.String("job-id", job.ID),
slog.Int("sequence-no", job.SequenceNo),
)
case <-parentContext.Done():
running = false

close(forwardChOut) // ⚠️ This is new
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 (#workers: '%v') WorkerPool.run - done received ☢️☢️☢️",
len(p.private.pool),
))
close(forwardChOut)
p.Logger.Debug("done received",
slog.String("source", "worker-pool.run"),
slog.Int("pool size", len(p.private.pool)),
)
}
} else {
// ⚠️ This close is essential. Since the pool acts as a bridge between
// 💫 This close is essential. Since the pool acts as a bridge between
// 2 channels (p.sourceJobsChIn and p.private.workersJobsCh/forwardChOut),
// when the producer closes p.sourceJobsChIn, we need to delegate that
// closure to forwardChOut, otherwise we end up in a deadlock.
//
running = false
close(forwardChOut)
p.Logger.Debug("===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥")
p.Logger.Debug("source jobs chan closed",
slog.String("source", "worker-pool.run"),
)
}
}
}
Expand All @@ -216,16 +215,16 @@ func (p *WorkerPool[I, O]) run(
if err := p.drain(p.private.finishedCh); err != nil {
result.Error = err

p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete with error: '%v' (workers count: '%v'). 📛📛📛",
err,
len(p.private.pool),
))
p.Logger.Error("drain complete with error",
slog.String("source", "worker-pool.run"),
slog.Int("pool size", len(p.private.pool)),
slog.String("error", err.Error()),
)
} else {
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.run - drain complete OK (workers count: '%v'). ☑️☑️☑️",
len(p.private.pool),
))
p.Logger.Debug("drain complete OK",
slog.String("source", "worker-pool.run"),
slog.Int("pool size", len(p.private.pool)),
)
}
}

Expand All @@ -250,17 +249,17 @@ func (p *WorkerPool[I, O]) spawn(

p.private.pool[w.core.id] = w
go w.core.run(parentContext, parentCancel, outputChTimeout)
p.Logger.Debug(fmt.Sprintf(
"===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀",
w.core.id,
))
p.Logger.Debug("spawned new worker",
slog.String("source", "WorkerPool.spawn"),
slog.String("worker-id", string(w.core.id)),
)
}

func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊",
len(p.private.pool), runtime.NumGoroutine(),
))
p.Logger.Debug("waiting for remaining workers...",
slog.String("source", "WorkerPool.drain"),
slog.Int("pool size", len(p.private.pool)),
)

var firstError error

Expand Down Expand Up @@ -311,21 +310,21 @@ func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) error {
}

if workerResult.err != nil {
p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker (%v) 💢💢💢 finished with error: '%v'",
workerResult.id,
workerResult.err,
))
p.Logger.Error("worker finished with error",
slog.String("source", "WorkerPool.drain"),
slog.String("result-id", string(workerResult.id)),
slog.String("error", workerResult.err.Error()),
)

if firstError == nil {
firstError = workerResult.err
}
}

p.Logger.Debug(fmt.Sprintf(
"!!!! 🧊 WorkerPool.drain - worker-result-error(%v) finished, remaining: '%v' 🟥",
workerResult.err, len(p.private.pool),
))
p.Logger.Debug("worker pool finished",
slog.String("source", "WorkerPool.drain"),
slog.Int("remaining", len(p.private.pool)),
)
}

return firstError
Expand Down
54 changes: 30 additions & 24 deletions boost/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@ func (w *worker[I, O]) run(parentContext context.Context,
id: w.id,
}
defer func(r *workerFinishedResult) {
w.finishedChOut <- r // ⚠️ non-pre-emptive send, but this should be ok
w.finishedChOut <- r // 💫 non-pre-emptive send, but this should be ok

w.logger.Debug(fmt.Sprintf(" <--- 🚀 worker.run(%v) (SENT FINISHED - error:'%v'). 🚀🚀🚀",
w.id, r.err,
))
w.logger.Debug("send complete",
slog.String("source", "worker.run"),
slog.String("worker-id", string(w.id)),
)
}(&result)

w.logger.Debug(fmt.Sprintf(" ---> 🚀 worker.run(%v) ...(ctx:%+v)\n", w.id, parentContext))

for running := true; running; {
select {
case <-parentContext.Done():
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶", w.id,
))
w.logger.Debug("finished - done received",
slog.String("source", "worker.run"),
slog.String("worker-id", string(w.id)),
)

running = false
case job, ok := <-w.jobsChIn:
if ok {
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(input:'%v')", w.id, job.Input,
))
w.logger.Debug("read from channel",
slog.String("source", "worker.run"),
slog.String("worker-id", string(w.id)),
slog.String("job-input", fmt.Sprintf("%v", w.id)),
)

err := w.invoke(parentContext, parentCancel, outputChTimeout, job)

Expand All @@ -55,9 +57,10 @@ func (w *worker[I, O]) run(parentContext context.Context,
running = false
}
} else {
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.run(%v)(jobs chan closed) 🟥🟥🟥", w.id,
))
w.logger.Debug("jobs chan closed",
slog.String("source", "worker.run"),
slog.String("worker-id", string(w.id)),
)

running = false
}
Expand All @@ -78,22 +81,25 @@ func (w *worker[I, O]) invoke(parentContext context.Context,
result, _ := w.exec(job)

if w.outputsChOut != nil {
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke ⏰ output timeout: '%v'", outputChTimeout,
))
w.logger.Debug("output timeout",
slog.String("source", "Worker.invoke"),
slog.String("output-channel-timeout", outputChTimeout.String()),
)

select {
case w.outputsChOut <- result:

case <-parentContext.Done():
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥", w.id,
))
w.logger.Debug("done received",
slog.String("source", "Worker.invoke"),
slog.String("worker-id", string(w.id)),
)

case <-outputContext.Done():
w.logger.Debug(fmt.Sprintf(
" ---> 🚀 worker.invoke(%v)(cancel) - timeout on send 👿👿👿", w.id,
))
w.logger.Debug("cancel - timeout on send",
slog.String("source", "Worker.invoke"),
slog.String("worker-id", string(w.id)),
)

// ??? err = i18n.NewOutputChTimeoutError()
err = errors.New("timeout on send")
Expand Down

0 comments on commit f7c8ebf

Please sign in to comment.