Skip to content

Commit

Permalink
fix jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Oct 7, 2024
1 parent 950c9e8 commit faef35c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 46 deletions.
12 changes: 10 additions & 2 deletions handlers/job/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ func (s *Handler) log(c *gin.Context) {
c.Header("Cache-Control", "no-cache,no-store,no-transform")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
clientGone := c.Writer.CloseNotify()

c.Stream(func(w io.Writer) bool {
if msg, ok := <-l; ok {
select {
case <-clientGone:
return false
case <-c.Request.Context().Done():
return false
case msg, ok := <-l:
if !ok {
return false
}
c.SSEvent("message", msg)
return true
}
return false
})
}
59 changes: 30 additions & 29 deletions services/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,19 @@ func (s *Job) Run(ctx context.Context) error {
func (s *Job) ObserveLog() *Observer {
o := NewObserver()
s.observers = append(s.observers, o)
go func() {
for _, i := range s.l {
o.Push(i)
if i.Level == Close {
o.Close()
}
}
}()
return o
}

func (s *Job) log(l LogItem) error {
l.Timestamp = time.Now()
if l.Level == Close {
s.closed = true
}
if l.Level == InProgress {
s.cur = l.Tag
} else {
l.Tag = s.cur
}
s.l = append(s.l, l)
for _, o := range s.observers {
o.Push(l)
Expand All @@ -173,23 +173,8 @@ func (s *Job) log(l LogItem) error {
}

message := l.Message
if l.Level == Done {
message = "done"
}
if l.Level == Redirect {
message = "redirect"
}
if l.Level == StatusUpdate {
message = "statusupdate"
}
if l.Level == RenderTemplate {
message = "rendertemplate"
}
if l.Level == Close {
message = "close"
}
if l.Level == Open {
message = "open"
if message == "" {
message = string(l.Level)
}
log.WithFields(log.Fields{
"ID": s.ID,
Expand Down Expand Up @@ -368,17 +353,33 @@ func (s *Jobs) Log(ctx context.Context, id string) (c chan LogItem, err error) {
close(c)
return
}
jCtx, cancel := context.WithTimeout(context.Background(), state.TTL)
jCtx, cancel := context.WithTimeout(ctx, state.TTL)
j = s.Enqueue(jCtx, cancel, id, nil, false)
} else {
log.Infof("found local job with id=%+v", id)
}
go func() {
o := j.ObserveLog()
for i := range o.C {
for _, i := range j.l {
c <- i
}
close(c)
if j.closed {
close(c)
} else {
o := j.ObserveLog()
for {
select {
case <-ctx.Done():
close(c)
return
case i, okk := <-o.C:
if !okk {
close(c)
return
}
c <- i
}
}
}
}()
return
}
Expand Down
51 changes: 36 additions & 15 deletions services/job/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,40 @@ func (s *Redis) GetState(ctx context.Context, id string) (state *State, err erro
}

func (s *Redis) Sub(ctx context.Context, id string) (res chan LogItem, err error) {
ch, err := s.subRaw(ctx, id)
cctx, cancel := context.WithCancel(ctx)
ch, err := s.subRaw(cctx, id)
if err != nil || ch == nil {
cancel()
return
}
res = make(chan LogItem)
go func() {
for i := range ch {
var li LogItem
err = json.Unmarshal([]byte(i), &li)
if err != nil {
res <- LogItem{
Level: Error,
Message: err.Error(),
}
for {
select {
case <-cctx.Done():
close(res)
return
case i := <-ch:
var li LogItem
err = json.Unmarshal([]byte(i), &li)
if err != nil {
res <- LogItem{
Level: Error,
Message: err.Error(),
}
cancel()
close(res)
return
}
res <- li
if li.Level == Close {
cancel()
close(res)
return
}
}
res <- li

}
close(res)
}()
return
}
Expand Down Expand Up @@ -127,6 +141,9 @@ func (s *Redis) subRaw(ctx context.Context, id string) (res chan string, err err
}
res <- i
}
if ctx.Err() != nil {
return
}
ps := s.cl.Subscribe(ctx, key)
defer func(ps *redis.PubSub) {
_ = ps.Close()
Expand All @@ -135,11 +152,15 @@ func (s *Redis) subRaw(ctx context.Context, id string) (res chan string, err err
if err = ps.Ping(ctx); err != nil {
return
}

for m := range ps.Channel() {
res <- m.Payload
for {
select {
case <-ctx.Done():
close(res)
return
case msg := <-ps.Channel():
res <- msg.Payload
}
}
close(res)
}()
return
}
Expand Down

0 comments on commit faef35c

Please sign in to comment.