Skip to content

Commit

Permalink
another try to remove job observer leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Oct 8, 2024
1 parent c1b586d commit b4c9aaa
Showing 1 changed file with 5 additions and 26 deletions.
31 changes: 5 additions & 26 deletions services/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,12 @@ import (
)

type Observer struct {
C chan LogItem
mux sync.Mutex
closed bool
ID string
}

func (s *Observer) Close() {
s.mux.Lock()
defer s.mux.Unlock()
if !s.closed {
s.closed = true
close(s.C)
}
C chan LogItem
ID string
}

func (s *Observer) Push(v LogItem) {
s.mux.Lock()
defer s.mux.Unlock()
if s.closed {
return
}
s.C <- v
if v.Level == Close {
close(s.C)
s.closed = true
}
}

func NewObserver() *Observer {
Expand Down Expand Up @@ -158,7 +138,6 @@ func (s *Job) ObserveLog(ctx context.Context) *Observer {
s.observers[o.ID] = o
go func(o *Observer, s *Job) {
<-ctx.Done()
o.Close()
s.observersMux.Lock()
defer s.observersMux.Unlock()
delete(s.observers, o.ID)
Expand Down Expand Up @@ -386,12 +365,12 @@ func (s *Jobs) Log(ctx context.Context, id string) (c chan LogItem, err error) {
case <-ctx.Done():
close(c)
return
case i, okk := <-o.C:
if !okk {
case i := <-o.C:
c <- i
if i.Level == Close {
close(c)
return
}
c <- i
}
}
}
Expand Down

0 comments on commit b4c9aaa

Please sign in to comment.