Skip to content

Commit

Permalink
another try to remove job observer leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Oct 8, 2024
1 parent b4c9aaa commit 3638669
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 16 deletions.
8 changes: 1 addition & 7 deletions handlers/job/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

func (s *Handler) log(c *gin.Context) {
ctx, cancel := context.WithCancel(c.Request.Context())
defer cancel()
l, err := s.q.GetOrCreate(c.Param("queue_id")).Log(ctx, c.Param("job_id"))
if err != nil {
cancel()
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}
Expand All @@ -21,19 +21,13 @@ func (s *Handler) log(c *gin.Context) {
c.Header("Cache-Control", "no-cache,no-store,no-transform")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
clientGone := c.Writer.CloseNotify()

c.Stream(func(w io.Writer) bool {
select {
case <-clientGone:
cancel()
return false
case <-c.Request.Context().Done():
cancel()
return false
case msg, ok := <-l:
if !ok {
cancel()
return false
}
c.SSEvent("message", msg)
Expand Down
5 changes: 4 additions & 1 deletion handlers/job/script/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,11 @@ func (s *ActionScript) warmUp(j *job.Job, m string, u string, su string, size in
log.WithError(err).Error("failed to get stats")
return
}
for ev := range ch {
select {
case ev := <-ch:
j.StatusUpdate(fmt.Sprintf("%v peers", ev.Peers))
case <-ctx2.Done():
return
}
}()
}
Expand Down
10 changes: 2 additions & 8 deletions services/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,11 @@ func (s *Job) Run(ctx context.Context) error {
return nil
}

func (s *Job) ObserveLog(ctx context.Context) *Observer {
func (s *Job) ObserveLog() *Observer {
s.observersMux.Lock()
defer s.observersMux.Unlock()
o := NewObserver()
s.observers[o.ID] = o
go func(o *Observer, s *Job) {
<-ctx.Done()
s.observersMux.Lock()
defer s.observersMux.Unlock()
delete(s.observers, o.ID)
}(o, s)
return o
}

Expand Down Expand Up @@ -359,7 +353,7 @@ func (s *Jobs) Log(ctx context.Context, id string) (c chan LogItem, err error) {
if j.closed {
close(c)
} else {
o := j.ObserveLog(ctx)
o := j.ObserveLog()
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 3638669

Please sign in to comment.