Skip to content

Commit

Permalink
another try to remove job observer leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Oct 9, 2024
1 parent 4a3e5fe commit 3e9c29b
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions services/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func NewObserver() *Observer {
}

type Job struct {
ID string
Queue string
l []LogItem
runnable Runnable
observers map[string]*Observer
observersMux sync.Mutex
closed bool
mux sync.Mutex
cur string
Context context.Context
storage Storage
main bool
purge bool
ID string
Queue string
l []LogItem
runnable Runnable
observers map[string]*Observer
//observersMux sync.Mutex
closed bool
mux sync.Mutex
cur string
Context context.Context
storage Storage
main bool
purge bool
}

type LogItemLevel string
Expand Down Expand Up @@ -157,23 +157,23 @@ func (s *Job) Run(ctx context.Context) error {
}

func (s *Job) ObserveLog() *Observer {
s.observersMux.Lock()
defer s.observersMux.Unlock()
//s.observersMux.Lock()
//defer s.observersMux.Unlock()
o := NewObserver()
s.observers[o.ID] = o
return o
}

func (s *Job) RemoveObserver(o *Observer) {
s.observersMux.Lock()
defer s.observersMux.Unlock()
delete(s.observers, o.ID)
o.Close()
}
//func (s *Job) RemoveObserver(o *Observer) {
// s.observersMux.Lock()
// defer s.observersMux.Unlock()
// delete(s.observers, o.ID)
// o.Close()
//}

func (s *Job) pushToObservers(l LogItem) {
s.observersMux.Lock()
defer s.observersMux.Unlock()
//s.observersMux.Lock()
//defer s.observersMux.Unlock()
for _, o := range s.observers {
o.Push(l)
}
Expand Down Expand Up @@ -333,8 +333,8 @@ func (s *Job) close() {
_ = s.log(LogItem{
Level: Close,
})
s.observersMux.Lock()
defer s.observersMux.Unlock()
//s.observersMux.Lock()
//defer s.observersMux.Unlock()
for _, o := range s.observers {
o.Close()
}
Expand Down Expand Up @@ -411,7 +411,7 @@ func (s *Jobs) Log(ctx context.Context, id string) (c chan LogItem, err error) {
select {
case <-ctx.Done():
close(c)
j.RemoveObserver(o)
//j.RemoveObserver(o)
return
case i, okk := <-o.C:
if !okk {
Expand Down

0 comments on commit 3e9c29b

Please sign in to comment.