Skip to content

Commit

Permalink
Merge pull request #3 from reugn/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
reugn authored Apr 19, 2019
2 parents 4767bda + 68178ab commit 24a19aa
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 11 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@ Job interface. Should be implemented by custom jobs for further scheduling
type Job interface {
Execute()
Description() string
Key() int
}
```
Scheduler interface
```go
type Scheduler interface {
//start scheduler
Start()
//schedule Job with given Trigger
ScheduleJob(job Job, trigger Trigger) error
//get all scheduled Job keys
GetJobKeys() []int
//get scheduled Job metadata
GetScheduledJob(key int) (*ScheduledJob, error)
//remove Job from execution queue
DeleteJob(key int) error
//clear all scheduled jobs
Clear()
//shutdown scheduler
Stop()
}
```
Expand Down
18 changes: 16 additions & 2 deletions examples/go-quartz.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func (pj PrintJob) Description() string {
return pj.desc
}

func (pj PrintJob) Key() int {
return quartz.HashCode(pj.Description())
}

func (pj PrintJob) Execute() {
fmt.Println("Executing " + pj.Description())
}
Expand All @@ -24,12 +28,22 @@ func (pj PrintJob) Execute() {
func main() {
sched := quartz.NewStdScheduler()
cronTrigger, _ := quartz.NewCronTrigger("1/3 * * * * *")
cronJob := PrintJob{"Cron job"}
sched.Start()
sched.ScheduleJob(&PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
sched.ScheduleJob(&PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
sched.ScheduleJob(&PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
sched.ScheduleJob(&PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3))
sched.ScheduleJob(&PrintJob{"Cron job"}, cronTrigger)
time.Sleep(time.Second * 15)
sched.ScheduleJob(&cronJob, cronTrigger)

time.Sleep(time.Second * 10)

j, _ := sched.GetScheduledJob(cronJob.Key())
fmt.Println(j.TriggerDescription)
fmt.Println(sched.GetJobKeys())
sched.DeleteJob(cronJob.Key())
fmt.Println(sched.GetJobKeys())

time.Sleep(time.Second * 10)
sched.Stop()
}
4 changes: 2 additions & 2 deletions quartz/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (pq *PriorityQueue) Head() *Item {
return (*pq)[0]
}

func (pq *PriorityQueue) Update(item *Item, sleepTime int64) {
item.priority = sleepTime
func (pq *PriorityQueue) Update(item *Item, nextRunTime int64) {
item.priority = nextRunTime
heap.Fix(pq, item.index)
}
86 changes: 79 additions & 7 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package quartz

import (
"container/heap"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,11 +11,29 @@ import (
type Job interface {
Execute()
Description() string
Key() int
}

type ScheduledJob struct {
Job Job
TriggerDescription string
NextRunTime int64
}

type Scheduler interface {
//start scheduler
Start()
//schedule Job with given Trigger
ScheduleJob(job Job, trigger Trigger) error
//get all scheduled Job keys
GetJobKeys() []int
//get scheduled Job metadata
GetScheduledJob(key int) (*ScheduledJob, error)
//remove Job from execution queue
DeleteJob(key int) error
//clear all scheduled jobs
Clear()
//shutdown scheduler
Stop()
}

Expand All @@ -36,7 +55,7 @@ func NewStdScheduler() *StdScheduler {
}

func (sched *StdScheduler) ScheduleJob(job Job, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(time.Now().UTC().UnixNano())
nextRunTime, err := trigger.NextFireTime(NowNano())
if err == nil {
sched.feeder <- &Item{
job,
Expand All @@ -57,6 +76,50 @@ func (sched *StdScheduler) Start() {
go sched.startExecutionLoop()
}

func (sched *StdScheduler) GetJobKeys() []int {
sched.Lock()
defer sched.Unlock()
keys := make([]int, 0, sched.Queue.Len())
for _, item := range *sched.Queue {
keys = append(keys, item.Job.Key())
}
return keys
}

func (sched *StdScheduler) GetScheduledJob(key int) (*ScheduledJob, error) {
sched.Lock()
defer sched.Unlock()
for _, item := range *sched.Queue {
if item.Job.Key() == key {
return &ScheduledJob{
item.Job,
item.Trigger.Description(),
item.priority,
}, nil
}
}
return nil, errors.New("No Job with given Key found")
}

func (sched *StdScheduler) DeleteJob(key int) error {
sched.Lock()
defer sched.Unlock()
for _, item := range *sched.Queue {
if item.Job.Key() == key {
sched.Queue.Pop()
sched.reset()
return nil
}
}
return errors.New("No Job with given Key found")
}

func (sched *StdScheduler) Clear() {
sched.Lock()
defer sched.Unlock()
sched.Queue = &PriorityQueue{}
}

func (sched *StdScheduler) Stop() {
fmt.Println("Closing scheduler")
close(sched.exit)
Expand Down Expand Up @@ -99,12 +162,17 @@ func (sched *StdScheduler) calculateNextTick() <-chan time.Time {
}

func (sched *StdScheduler) executeAndReschedule() {
if sched.queueLen() == 0 {
return
}
//fetch item
sched.Lock()
item := heap.Pop(sched.Queue).(*Item)
sched.Unlock()
//execute Job
go item.Job.Execute()
if !isOutdated(item.priority) {
go item.Job.Execute()
}
//reschedule Job
nextRunTime, err := item.Trigger.NextFireTime(item.priority)
if err == nil {
Expand All @@ -119,10 +187,7 @@ func (sched *StdScheduler) startFeedReader() {
case item := <-sched.feeder:
sched.Lock()
heap.Push(sched.Queue, item)
select {
case sched.interrupt <- struct{}{}:
default:
}
sched.reset()
sched.Unlock()
case <-sched.exit:
fmt.Println("Exit feed reader")
Expand All @@ -131,8 +196,15 @@ func (sched *StdScheduler) startFeedReader() {
}
}

func (sched *StdScheduler) reset() {
select {
case sched.interrupt <- struct{}{}:
default:
}
}

func parkTime(ts int64) int64 {
now := time.Now().UTC().UnixNano()
now := NowNano()
if ts > now {
return ts - now
}
Expand Down
16 changes: 16 additions & 0 deletions quartz/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package quartz

import (
"fmt"
"hash/fnv"
"strconv"
"time"
)

func indexes(search []string, target []string) []int {
Expand Down Expand Up @@ -124,3 +126,17 @@ func dayOfTheWeek(y int, m int, d int) string {
}
return days[((y + y/4 - y/100 + y/400 + t[m-1] + d) % 7)]
}

func NowNano() int64 {
return time.Now().UTC().UnixNano()
}

func isOutdated(_time int64) bool {
return _time < NowNano()-(time.Second*30).Nanoseconds()
}

func HashCode(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32())
}

0 comments on commit 24a19aa

Please sign in to comment.