Skip to content

Commit

Permalink
chore: rework logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Adis Durakovic committed Mar 13, 2024
1 parent 9a0f274 commit 6b8ec56
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 117 deletions.
43 changes: 22 additions & 21 deletions frontend/composables/useJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export const useJobs = defineStore('useJobs', () => {
const progress = ref([])

function scheduleIsRunning(id: string) {
console.log('running.value', running.value)
const j = running.value?.find((job: any) => job.id === id)

if (j) {
Expand All @@ -21,34 +22,34 @@ export const useJobs = defineStore('useJobs', () => {
}

function repoIsRunning(id: string) {
const j = running.value?.find((job: Schedule) => job.schedule.to_repository_id === id)
if (j) {
if (j['out']['running'] !== undefined) {
return j['out']['running']
}
return true
}
// const j = running.value?.find((job: any) => job.schedule.to_repository_id === id)
// if (j) {
// if (j['out']['running'] !== undefined) {
// return j['out']['running']
// }
// return true
// }
return false
}
function repoIsSynching(id: string) {
const j = running.value?.find((job: Schedule) => job.schedule.from_repository_id === id)
if (j) {
if (j['out']['running'] !== undefined) {
return j['out']['running']
}
return true
}
// const j = running.value?.find((job: Schedule) => job.schedule.from_repository_id === id)
// if (j) {
// if (j['out']['running'] !== undefined) {
// return j['out']['running']
// }
// return true
// }
return false
}

function backupIsRunning(id: string) {
const j = running.value?.find((job: Schedule) => job.schedule.backup_id === id)
if (j) {
if (j['out']['running'] !== undefined) {
return j['out']['running']
}
return true
}
// const j = running.value?.find((job: Schedule) => job.schedule.backup_id === id)
// if (j) {
// if (j['out']['running'] !== undefined) {
// return j['out']['running']
// }
// return true
// }
return false
}

Expand Down
6 changes: 3 additions & 3 deletions internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewResticity() (Resticity, error) {
outputChan := make(chan ChanMsg)
errorChan := make(chan ChanMsg)
settings := NewSettings(flagArgs.ConfigFile)
restic := NewRestic(settings)
scheduler, err := NewScheduler(settings, restic, &outputChan)
restic := NewRestic(settings, &outputChan, &errorChan)
scheduler, err := NewScheduler(settings, restic, &outputChan, &errorChan)
return Resticity{flagArgs, outputChan, errorChan, settings, restic, scheduler}, err
}

Expand Down Expand Up @@ -75,6 +75,6 @@ func SetLogLevel() {
if err == nil {
log.SetLevel(l)
} else {
log.SetLevel(log.InfoLevel)
log.SetLevel(log.DebugLevel)
}
}
34 changes: 21 additions & 13 deletions internal/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import (

type Restic struct {
settings *Settings
OutputCh *chan ChanMsg
ErrorCh *chan ChanMsg
}

func NewRestic(settings *Settings) *Restic {
func NewRestic(settings *Settings, outch *chan ChanMsg, errch *chan ChanMsg) *Restic {
r := &Restic{}

r.settings = settings
r.OutputCh = outch
r.ErrorCh = errch
return r
}

Expand All @@ -35,12 +39,14 @@ func (r *Restic) PipeOutErr(
scanner := bufio.NewScanner(stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
if job != nil {
go func(t string) {
*&job.OutChan <- t
log.Debug("pipeout", "out", t)
}(scanner.Text())
}
go func(t string) {
msg := ChanMsg{Id: "", Msg: t}
if job != nil {
msg.Id = job.Id
}
(*r.OutputCh) <- msg
log.Debug("pipeout", "out", t)
}(scanner.Text())

sout.WriteString(scanner.Text())
}
Expand All @@ -57,12 +63,14 @@ func (r *Restic) PipeOutErr(
scanner.Split(bufio.ScanLines)
for scanner.Scan() {

if job != nil {
go func(t string) {
*&job.ErrChan <- t
log.Debug("pipeout", "out", t)
}(scanner.Text())
}
go func(t string) {
msg := ChanMsg{Id: "", Msg: t}
if job != nil {
msg.Id = job.Id
}
(*r.OutputCh) <- msg
log.Debug("pipeout", "out", t)
}(scanner.Text())
serr.WriteString(scanner.Text())
}
}()
Expand Down
52 changes: 18 additions & 34 deletions internal/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Job struct {
Force bool `json:"force"`
Ctx context.Context
Cancel context.CancelFunc
OutChan chan string
ErrChan chan string
// OutChan chan string
// ErrChan chan string
}

type Scheduler struct {
Expand All @@ -30,14 +30,21 @@ type Scheduler struct {
jmu sync.Mutex
settings *Settings
outputCh *chan ChanMsg
errorCh *chan ChanMsg
}

func NewScheduler(settings *Settings, restic *Restic, ch *chan ChanMsg) (*Scheduler, error) {
func NewScheduler(
settings *Settings,
restic *Restic,
outch *chan ChanMsg,
errch *chan ChanMsg,
) (*Scheduler, error) {

s := &Scheduler{}
s.settings = settings
s.restic = restic
s.outputCh = ch
s.outputCh = outch
s.errorCh = errch
if gc, err := gocron.NewScheduler(); err == nil {
s.Gocron = gc
s.Gocron.Start()
Expand Down Expand Up @@ -76,11 +83,9 @@ func (s *Scheduler) DeleteRunningJob(id string) {
defer s.jmu.Unlock()
for i, j := range s.Jobs {
if j.Id == id {
go func() {
if j.OutChan != nil {
j.OutChan <- "{\"running\": false}"
}
}()
msg := ChanMsg{Id: id, Msg: "{\"running\": false}"}
(*s.outputCh) <- msg

log.Debug("Stopping running job", "id", id)
s.Jobs[i].Running = false
s.Jobs[i].Force = false
Expand All @@ -105,13 +110,11 @@ func (s *Scheduler) SetRunningJob(id string) {
defer s.jmu.Unlock()
for i, j := range s.Jobs {
if j.Id == id {
log.Debug("Setting forced running job", "id", id)
s.Jobs[i].Running = true
go func() {
if j.OutChan != nil {
j.OutChan <- "{\"running\": true}"
}
}()
msg := ChanMsg{Id: id, Msg: "{\"running\": true}"}
(*s.outputCh) <- msg
log.Debug("Setting forced running job", "id", id)

break
}
}
Expand Down Expand Up @@ -194,8 +197,6 @@ func (s *Scheduler) RescheduleBackups() {
}

ctx, cancel := context.WithCancel(context.Background())
outch := make(chan string)
errch := make(chan string)

s.Jobs = append(
s.Jobs,
Expand All @@ -207,26 +208,9 @@ func (s *Scheduler) RescheduleBackups() {
Force: false,
Ctx: ctx,
Cancel: cancel,
OutChan: outch,
ErrChan: errch,
},
)

go func() {
outStr := ""
errStr := ""
for {
select {
case o := <-outch:
outStr = o
*s.outputCh <- ChanMsg{Id: schedule.Id, Out: outStr, Err: errStr, Schedule: schedule}
case e := <-errch:
errStr = e
*s.outputCh <- ChanMsg{Id: schedule.Id, Out: outStr, Err: errStr, Schedule: schedule}
}
}
}()

}

}
110 changes: 68 additions & 42 deletions internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,27 @@ func cleanClients() {
}
}

func doBroadcast(d ChanMsg, outputs []ChanMsg) {
if funk.Find(
outputs,
func(out ChanMsg) bool { return out.Id == d.Id },
) == nil {
outputs = append(outputs, d)
} else {
for i, out := range outputs {
if out.Id == d.Id {
outputs[i] = d
break
}
func handlePing(c *websocket.Conn) {
for {
time.Sleep(1 * time.Second)
_, _, err := c.ReadMessage()
if err == nil {
go func() {
for connection, client := range clients {

if connection.RemoteAddr().String() == c.RemoteAddr().String() {
c := client
c.LastSeen = time.Now()
clients[connection] = c

break
}
}
}()

}
}
if j, err := json.Marshal(funk.Filter(outputs, func(o ChanMsg) bool { return o.Out != "" && o.Out != "{}" })); err == nil {
broadcast <- string(j)

} else {
log.Error("socket: marshal", "err", err)
}
}

func RunServer(
Expand Down Expand Up @@ -127,45 +128,70 @@ func RunServer(

api.Get("/ws", websocket.New(func(c *websocket.Conn) {

outs := []WsMsg{}
errs := []WsMsg{}

defer func() {
unregister <- c
c.Close()
}()

outputs := []ChanMsg{}
errors := []ChanMsg{}

register <- c

go func() {
for {
time.Sleep(1 * time.Second)
_, _, err := c.ReadMessage()
if err == nil {
go func() {
for connection, client := range clients {

if connection.RemoteAddr().String() == c.RemoteAddr().String() {
c := client
c.LastSeen = time.Now()
clients[connection] = c
go handlePing(c)

for {
select {
case o := <-*outputChan:
m := WsMsg{Id: o.Id, Out: o.Msg, Err: ""}
log.Debug(m)
if m.Id != "" {
if funk.Find(
outs,
func(arrm WsMsg) bool { return arrm.Id == m.Id },
) == nil {
outs = append(outs, m)
} else {
for i, arrm := range outs {
if arrm.Id == m.Id {
(outs)[i] = m
break
}
}
}()

}
}
}
}()
if j, err := json.Marshal(funk.Filter(outs, func(o WsMsg) bool { return o.Out != "" && o.Out != "{}" })); err == nil {
broadcast <- string(j)

for {

select {
case o := <-*outputChan:
doBroadcast(o, outputs)
} else {
log.Error("socket: marshal", "err", err)
}
break
case e := <-*errorChan:
doBroadcast(e, errors)
m := WsMsg{Id: e.Id, Out: "", Err: e.Msg}
log.Debug(m)
if m.Id != "" {
if funk.Find(
errs,
func(arrm WsMsg) bool { return arrm.Id == m.Id },
) == nil {
errs = append(errs, m)
} else {
for i, arrm := range errs {
if arrm.Id == m.Id {
(errs)[i] = m
break
}
}
}
}
if j, err := json.Marshal(funk.Filter(errs, func(o WsMsg) bool { return o.Err != "" && o.Err != "{}" })); err == nil {
broadcast <- string(j)

} else {
log.Error("socket: marshal", "err", err)
}
break
}

}
Expand Down
Loading

0 comments on commit 6b8ec56

Please sign in to comment.