diff --git a/handlers/job/log.go b/handlers/job/log.go index e3b1080..a846f7f 100644 --- a/handlers/job/log.go +++ b/handlers/job/log.go @@ -18,12 +18,20 @@ func (s *Handler) log(c *gin.Context) { c.Header("Cache-Control", "no-cache,no-store,no-transform") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") + clientGone := c.Writer.CloseNotify() c.Stream(func(w io.Writer) bool { - if msg, ok := <-l; ok { + select { + case <-clientGone: + return false + case <-c.Request.Context().Done(): + return false + case msg, ok := <-l: + if !ok { + return false + } c.SSEvent("message", msg) return true } - return false }) } diff --git a/services/job/job.go b/services/job/job.go index fd1cd3a..8cc49f0 100644 --- a/services/job/job.go +++ b/services/job/job.go @@ -145,19 +145,19 @@ func (s *Job) Run(ctx context.Context) error { func (s *Job) ObserveLog() *Observer { o := NewObserver() s.observers = append(s.observers, o) - go func() { - for _, i := range s.l { - o.Push(i) - if i.Level == Close { - o.Close() - } - } - }() return o } func (s *Job) log(l LogItem) error { l.Timestamp = time.Now() + if l.Level == Close { + s.closed = true + } + if l.Level == InProgress { + s.cur = l.Tag + } else { + l.Tag = s.cur + } s.l = append(s.l, l) for _, o := range s.observers { o.Push(l) @@ -173,23 +173,8 @@ func (s *Job) log(l LogItem) error { } message := l.Message - if l.Level == Done { - message = "done" - } - if l.Level == Redirect { - message = "redirect" - } - if l.Level == StatusUpdate { - message = "statusupdate" - } - if l.Level == RenderTemplate { - message = "rendertemplate" - } - if l.Level == Close { - message = "close" - } - if l.Level == Open { - message = "open" + if message == "" { + message = string(l.Level) } log.WithFields(log.Fields{ "ID": s.ID, @@ -368,17 +353,33 @@ func (s *Jobs) Log(ctx context.Context, id string) (c chan LogItem, err error) { close(c) return } - jCtx, cancel := context.WithTimeout(context.Background(), state.TTL) + jCtx, cancel := context.WithTimeout(ctx, state.TTL) j = s.Enqueue(jCtx, cancel, id, nil, false) } else { log.Infof("found local job with id=%+v", id) } go func() { - o := j.ObserveLog() - for i := range o.C { + for _, i := range j.l { c <- i } - close(c) + if j.closed { + close(c) + } else { + o := j.ObserveLog() + for { + select { + case <-ctx.Done(): + close(c) + return + case i, okk := <-o.C: + if !okk { + close(c) + return + } + c <- i + } + } + } }() return } diff --git a/services/job/redis.go b/services/job/redis.go index 94e6b9e..4fda211 100644 --- a/services/job/redis.go +++ b/services/job/redis.go @@ -71,26 +71,40 @@ func (s *Redis) GetState(ctx context.Context, id string) (state *State, err erro } func (s *Redis) Sub(ctx context.Context, id string) (res chan LogItem, err error) { - ch, err := s.subRaw(ctx, id) + cctx, cancel := context.WithCancel(ctx) + ch, err := s.subRaw(cctx, id) if err != nil || ch == nil { + cancel() return } res = make(chan LogItem) go func() { - for i := range ch { - var li LogItem - err = json.Unmarshal([]byte(i), &li) - if err != nil { - res <- LogItem{ - Level: Error, - Message: err.Error(), - } + for { + select { + case <-cctx.Done(): close(res) return + case i := <-ch: + var li LogItem + err = json.Unmarshal([]byte(i), &li) + if err != nil { + res <- LogItem{ + Level: Error, + Message: err.Error(), + } + cancel() + close(res) + return + } + res <- li + if li.Level == Close { + cancel() + close(res) + return + } } - res <- li + } - close(res) }() return } @@ -127,6 +141,9 @@ func (s *Redis) subRaw(ctx context.Context, id string) (res chan string, err err } res <- i } + if ctx.Err() != nil { + return + } ps := s.cl.Subscribe(ctx, key) defer func(ps *redis.PubSub) { _ = ps.Close() @@ -135,11 +152,15 @@ func (s *Redis) subRaw(ctx context.Context, id string) (res chan string, err err if err = ps.Ping(ctx); err != nil { return } - - for m := range ps.Channel() { - res <- m.Payload + for { + select { + case <-ctx.Done(): + close(res) + return + case msg := <-ps.Channel(): + res <- msg.Payload + } } - close(res) }() return }