Skip to content

Commit

Permalink
optimize job logging
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Oct 21, 2024
1 parent 21acad1 commit f42220d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
54 changes: 40 additions & 14 deletions services/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ type Job struct {
observers map[string]*Observer
closed bool
mux sync.Mutex
cur string
logMux sync.Mutex
curTag string
Context context.Context
storage Storage
main bool
purge bool
curLevel LogItemLevel
curStatus string
}

type LogItemLevel string
Expand Down Expand Up @@ -182,11 +185,11 @@ func (s *Job) pushToObservers(ctx context.Context, l LogItem) {
wg.Wait()
}

func (s *Job) pubToStorage(l LogItem) (err error) {
func (s *Job) pubToStorage(replaced bool, l LogItem) (err error) {
if l.Level == Open {
return
}
return s.storage.Pub(s.Context, s.ID, &l)
return s.storage.Pub(s.Context, s.ID, replaced, l)
}

func (s *Job) logToLogger(l LogItem) {
Expand All @@ -208,14 +211,23 @@ func (s *Job) logToLogger(l LogItem) {
func (s *Job) log(l LogItem) error {
l.Timestamp = time.Now()
if l.Level == InProgress {
s.cur = l.Tag
s.curTag = l.Tag
} else {
l.Tag = s.cur
l.Tag = s.curTag
}
if l.Level == StatusUpdate && s.curTag == l.Tag && s.curLevel != InProgress && s.curLevel != StatusUpdate {
return nil
}
if l.Level == StatusUpdate && s.curTag == l.Tag && s.curStatus == l.Status {
return nil
}
s.l = append(s.l, l)

s.curLevel = l.Level
s.curStatus = l.Status
replaced := s.addToLog(l)

if s.main {
err := s.pubToStorage(l)
err := s.pubToStorage(replaced, l)
if err != nil {
return err
}
Expand All @@ -230,6 +242,20 @@ func (s *Job) log(l LogItem) error {
return nil
}

func (s *Job) addToLog(l LogItem) (replaced bool) {
s.logMux.Lock()
defer s.logMux.Unlock()
if len(s.l) > 0 {
last := s.l[len(s.l)-1]
if last.Level == StatusUpdate && s.curTag == last.Tag {
s.l = s.l[:len(s.l)-1]
replaced = true
}
}
s.l = append(s.l, l)
return
}

func (s *Job) open() *Job {
_ = s.log(LogItem{
Level: Open,
Expand All @@ -250,7 +276,7 @@ func (s *Job) Warn(err error, message string) *Job {
_ = s.log(LogItem{
Level: Warn,
Message: message,
Tag: s.cur,
Tag: s.curTag,
})
return s
}
Expand All @@ -260,17 +286,17 @@ func (s *Job) Error(err error) error {
_ = s.log(LogItem{
Level: Error,
Message: strings.Split(err.Error(), ":")[0],
Tag: s.cur,
Tag: s.curTag,
})
return err
}

func (s *Job) InProgress(message string) *Job {
s.cur = message
s.curTag = message
_ = s.log(LogItem{
Level: InProgress,
Message: message,
Tag: s.cur,
Tag: s.curTag,
})
return s
}
Expand All @@ -279,23 +305,23 @@ func (s *Job) StatusUpdate(status string) *Job {
_ = s.log(LogItem{
Level: StatusUpdate,
Status: status,
Tag: s.cur,
Tag: s.curTag,
})
return s
}

func (s *Job) Done() *Job {
_ = s.log(LogItem{
Level: Done,
Tag: s.cur,
Tag: s.curTag,
})
return s
}

func (s *Job) DoneWithMessage(msg string) *Job {
_ = s.log(LogItem{
Level: Done,
Tag: s.cur,
Tag: s.curTag,
Message: msg,
})
return s
Expand Down
9 changes: 8 additions & 1 deletion services/job/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ func NewRedis(cl redis.UniversalClient, prefix string) *Redis {
}
}

func (s *Redis) Pub(ctx context.Context, id string, l *LogItem) (err error) {
func (s *Redis) Pub(ctx context.Context, id string, replace bool, l LogItem) (err error) {
key := s.makeKey(id)
j, err := json.Marshal(l)
if err != nil {
return err
}

if replace {
cmd := s.cl.RPop(ctx, key)
if cmd.Err() != nil {
return cmd.Err()
}
}

cmd := s.cl.RPush(ctx, key, j)
if cmd.Err() != nil {
return cmd.Err()
Expand Down
4 changes: 2 additions & 2 deletions services/job/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ type State struct {
}

type Storage interface {
Pub(ctx context.Context, id string, l *LogItem) error
Pub(ctx context.Context, id string, replace bool, l LogItem) error
Sub(ctx context.Context, id string) (res chan LogItem, err error)
GetState(ctx context.Context, id string) (state *State, ok bool, err error)
Drop(ctx context.Context, id string) (err error)
}

type NilStorage struct{}

func (s *NilStorage) Pub(_ context.Context, _ string, _ *LogItem) error {
func (s *NilStorage) Pub(ctx context.Context, id string, replace bool, l LogItem) error {
return nil
}

Expand Down

0 comments on commit f42220d

Please sign in to comment.