Skip to content

Commit

Permalink
docs(example): add sample implementation of job queue (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 17, 2024
1 parent 1bbdc90 commit ce9e1c4
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ Several common Job implementations can be found in the [job](./job) package.

## Distributed mode

The scheduler can use its own implementation of `quartz.JobQueue` to allow state sharing.
The scheduler can use its own implementation of `quartz.JobQueue` to allow state sharing.
An example implementation of the job queue using the file system as a persistence layer
can be found [here](./examples/queue/file_system.go).

## Logger

Expand Down
304 changes: 304 additions & 0 deletions examples/queue/file_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/fs"
"math"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/reugn/go-quartz/logger"
"github.com/reugn/go-quartz/quartz"
)

const dataFolder = "./store"
const fileMode fs.FileMode = 0744

func init() {
quartz.Sep = "_"
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 31*time.Second)
defer cancel()

if _, err := os.Stat(dataFolder); os.IsNotExist(err) {
if err := os.Mkdir(dataFolder, fileMode); err != nil {
logger.Warnf("Failed to create data folder: %s", err)
return
}
}

logger.Info("Starting scheduler")
jobQueue := newJobQueue()
scheduler := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{
OutdatedThreshold: time.Second, // considering file system I/O latency
}, jobQueue)
scheduler.Start(ctx)

if jobQueue.Size() == 0 {
logger.Info("Scheduling new jobs")
jobDetail1 := quartz.NewJobDetail(&printJob{5}, quartz.NewJobKey("job1"))
if err := scheduler.ScheduleJob(jobDetail1, quartz.NewSimpleTrigger(5*time.Second)); err != nil {
logger.Warnf("Failed to schedule job: %s", jobDetail1.JobKey())
}
jobDetail2 := quartz.NewJobDetail(&printJob{10}, quartz.NewJobKey("job2"))
if err := scheduler.ScheduleJob(jobDetail2, quartz.NewSimpleTrigger(10*time.Second)); err != nil {
logger.Warnf("Failed to schedule job: %s", jobDetail2.JobKey())
}
} else {
logger.Info("Job queue is not empty")
}

<-ctx.Done()

scheduledJobs := jobQueue.ScheduledJobs()
jobNames := make([]string, 0, len(scheduledJobs))
for _, job := range scheduledJobs {
jobNames = append(jobNames, job.JobDetail().JobKey().String())
}
logger.Infof("Jobs in queue: %s", jobNames)
}

// printJob
type printJob struct {
seconds int
}

var _ quartz.Job = (*printJob)(nil)

func (job *printJob) Execute(_ context.Context) error {
logger.Infof("PrintJob: %d\n", job.seconds)
return nil
}
func (job *printJob) Description() string {
return fmt.Sprintf("PrintJob%s%d", quartz.Sep, job.seconds)
}

// scheduledPrintJob
type scheduledPrintJob struct {
jobDetail *quartz.JobDetail
trigger quartz.Trigger
nextRunTime int64
}

// serializedJob
type serializedJob struct {
Job string `json:"job"`
JobKey string `json:"job_key"`
Options *quartz.JobDetailOptions `json:"job_options"`
Trigger string `json:"trigger"`
NextRunTime int64 `json:"next_run_time"`
}

var _ quartz.ScheduledJob = (*scheduledPrintJob)(nil)

func (job *scheduledPrintJob) JobDetail() *quartz.JobDetail {
return job.jobDetail
}
func (job *scheduledPrintJob) Trigger() quartz.Trigger {
return job.trigger
}
func (job *scheduledPrintJob) NextRunTime() int64 {
return job.nextRunTime
}

// marshal returns the JSON encoding of the job.
func marshal(job quartz.ScheduledJob) ([]byte, error) {
var serialized serializedJob
serialized.Job = job.JobDetail().Job().Description()
serialized.JobKey = job.JobDetail().JobKey().String()
serialized.Options = job.JobDetail().Options()
serialized.Trigger = job.Trigger().Description()
serialized.NextRunTime = job.NextRunTime()
return json.Marshal(serialized)
}

// unmarshal parses the JSON-encoded job.
func unmarshal(encoded []byte) (quartz.ScheduledJob, error) {
var serialized serializedJob
if err := json.Unmarshal(encoded, &serialized); err != nil {
return nil, err
}
jobVals := strings.Split(serialized.Job, quartz.Sep)
i, err := strconv.Atoi(jobVals[1])
if err != nil {
return nil, err
}
job := &printJob{i} // assuming we know the job type
jobKeyVals := strings.Split(serialized.JobKey, quartz.Sep)
jobKey := quartz.NewJobKeyWithGroup(jobKeyVals[1], jobKeyVals[0])
jobDetail := quartz.NewJobDetailWithOptions(job, jobKey, serialized.Options)
triggerOpts := strings.Split(serialized.Trigger, quartz.Sep)
interval, _ := time.ParseDuration(triggerOpts[1])
trigger := quartz.NewSimpleTrigger(interval) // assuming we know the trigger type
return &scheduledPrintJob{
jobDetail: jobDetail,
trigger: trigger,
nextRunTime: serialized.NextRunTime,
}, nil
}

// jobQueue implements the quartz.JobQueue interface, using the file system
// as the persistence layer.
type jobQueue struct {
mtx sync.Mutex
}

var _ quartz.JobQueue = (*jobQueue)(nil)

// newJobQueue initializes and returns an empty jobQueue.
func newJobQueue() *jobQueue {
return &jobQueue{}
}

// Push inserts a new scheduled job to the queue.
// This method is also used by the Scheduler to reschedule existing jobs that
// have been dequeued for execution.
func (jq *jobQueue) Push(job quartz.ScheduledJob) error {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Tracef("Push: %s", job.JobDetail().JobKey())
serialized, err := marshal(job)
if err != nil {
return err
}
if err = os.WriteFile(fmt.Sprintf("%s/%d", dataFolder, job.NextRunTime()),
serialized, fileMode); err != nil {
logger.Errorf("Failed to write job: %s", err)
return err
}
return nil
}

// Pop removes and returns the next scheduled job from the queue.
func (jq *jobQueue) Pop() (quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Pop")
job, err := findHead()
if err == nil {
if err = os.Remove(fmt.Sprintf("%s/%d", dataFolder, job.NextRunTime())); err != nil {
logger.Errorf("Failed to delete job: %s", err)
return nil, err
}
return job, nil
}
logger.Errorf("Failed to find job: %s", err)
return nil, err
}

// Head returns the first scheduled job without removing it from the queue.
func (jq *jobQueue) Head() (quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Head")
job, err := findHead()
if err != nil {
logger.Errorf("Failed to find job: %s", err)
}
return job, err
}

func findHead() (quartz.ScheduledJob, error) {
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return nil, err
}
var lastUpdate int64 = math.MaxInt64
for _, file := range fileInfo {
if !file.IsDir() {
time, err := strconv.ParseInt(file.Name(), 10, 64)
if err == nil && time < lastUpdate {
lastUpdate = time
}
}
}
if lastUpdate == math.MaxInt64 {
return nil, errors.New("no jobs found")
}
data, err := os.ReadFile(fmt.Sprintf("%s/%d", dataFolder, lastUpdate))
if err != nil {
return nil, err
}
job, err := unmarshal(data)
if err != nil {
return nil, err
}
return job, nil
}

// Remove removes and returns the scheduled job with the specified key.
func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Remove")
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return nil, err
}
for _, file := range fileInfo {
if !file.IsDir() {
path := fmt.Sprintf("%s/%s", dataFolder, file.Name())
data, err := os.ReadFile(path)
if err == nil {
job, err := unmarshal(data)
if err == nil {
if jobKey.Equals(job.JobDetail().JobKey()) {
if err = os.Remove(path); err == nil {
return job, nil
}
}
}
}
}
}
return nil, errors.New("no jobs found")
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
func (jq *jobQueue) ScheduledJobs() []quartz.ScheduledJob {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("ScheduledJobs")
var jobs []quartz.ScheduledJob
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return jobs
}
for _, file := range fileInfo {
if !file.IsDir() {
data, err := os.ReadFile(fmt.Sprintf("%s/%s", dataFolder, file.Name()))
if err == nil {
job, err := unmarshal(data)
if err == nil {
jobs = append(jobs, job)
}
}
}
}
return jobs
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Size")
files, _ := os.ReadDir(dataFolder)
return len(files)
}

// Clear clears the job queue.
func (jq *jobQueue) Clear() error {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Clear")
return os.RemoveAll(dataFolder)
}

0 comments on commit ce9e1c4

Please sign in to comment.