Skip to content

Commit

Permalink
feat: updates requested in #15 (#18)
Browse files Browse the repository at this point in the history
Closes #15.
  • Loading branch information
robertrossmann authored Apr 29, 2024
2 parents 1ff5d9f + aa57be5 commit 5b317d0
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 154 deletions.
106 changes: 98 additions & 8 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package background

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"go.strv.io/background/observer"
"go.strv.io/background/task"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their
var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
)

// Manager keeps track of running goroutines and provides mechanisms to wait for them to finish or cancel their
// execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track
// of the tasks in the observer.
type Manager struct {
Expand All @@ -27,8 +35,8 @@ type Options struct {
// StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note
// that no effort is made to actually stop or kill the task.
StalledThreshold time.Duration
// Observer allow you to register monitoring functions that are called when something happens with the tasks that you
// schedule. These are useful for logging, monitoring, etc.
// Observer allows you to register monitoring functions that are called when something happens with the tasks that you
// execute. These are useful for logging, monitoring, etc.
Observer observer.Observer
// Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several
// strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
Expand All @@ -55,14 +63,14 @@ func NewManagerWithOptions(options Options) *Manager {
}
}

// Run schedules the provided function to be executed once in a goroutine.
// Run executes the provided function once in a goroutine.
func (m *Manager) Run(ctx context.Context, fn task.Fn) {
definition := task.Task{Fn: fn}
m.RunTask(ctx, definition)
}

// RunTask schedules the provided task to be executed in a goroutine. The task will be executed according to its type.
// By default, the task will be executed only once (TaskTypeOneOff).
// RunTask executes the provided task in a goroutine. The task will be executed according to its type; by default, only
// once (TaskTypeOneOff).
func (m *Manager) RunTask(ctx context.Context, definition task.Task) {
ctx = context.WithoutCancel(ctx)
done := make(chan error, 1)
Expand All @@ -79,11 +87,11 @@ func (m *Manager) RunTask(ctx context.Context, definition task.Task) {
go m.loop(ctx, definition, done)

default:
m.observer.OnTaskFailed(ctx, definition, task.ErrUnknownType)
m.observer.OnTaskFailed(ctx, definition, ErrUnknownType)
}
}

// Wait blocks until all scheduled one-off tasks have finished. Adding more one-off tasks will prolong the wait time.
// Wait blocks until all running one-off tasks have finished. Adding more one-off tasks will prolong the wait time.
func (m *Manager) Wait() {
m.taskmgr.group.Wait()
}
Expand Down Expand Up @@ -162,3 +170,85 @@ func (m *Manager) observe(ctx context.Context, definition task.Task, done <-chan
}
}
}

// MARK: Internal

// taskmgr is used internally for task tracking and synchronization.
type taskmgr struct {
group sync.WaitGroup
count atomic.Int32
}

// start tells the taskmgr that a new task has started.
func (m *taskmgr) start() {
m.group.Add(1)
m.count.Add(1)
}

// finish tells the taskmgr that a task has finished.
func (m *taskmgr) finish() {
m.group.Done()
m.count.Add(-1)
}

// loopmgr is used internally for loop tracking and synchronization and cancellation of the loops.
type loopmgr struct {
group sync.WaitGroup
count atomic.Int32
ctx context.Context
cancelfn context.CancelFunc
}

func mkloopmgr() loopmgr {
ctx, cancelfn := context.WithCancel(context.Background())
return loopmgr{
ctx: ctx,
cancelfn: cancelfn,
}
}

// start tells the loopmgr that a new loop has started.
func (m *loopmgr) start() {
m.group.Add(1)
m.count.Add(1)
}

// cancel tells the loopmgr that a loop has finished.
func (m *loopmgr) finish() {
m.group.Done()
m.count.Add(-1)
}

func (m *loopmgr) cancel() {
m.cancelfn()
m.group.Wait()
}

// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0,
// the channel will never receive any message.
func mktimeout(duration time.Duration) <-chan time.Time {
if duration == 0 {
return make(<-chan time.Time)
}
return time.After(duration)
}

// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides), 1))

if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}

// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}

return result
}
9 changes: 9 additions & 0 deletions examples/slog/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# log/slog observer

This example executable shows how to use the built-in `observer.Slog` to log messages using the default `log/slog` logger. To see it in action, clone the repository and run:

```sh
go run examples/slog/slog.go
```

You can quit the program by pressing `Ctrl+C`.
File renamed without changes.
90 changes: 0 additions & 90 deletions internal.go

This file was deleted.

34 changes: 0 additions & 34 deletions observer/multi.go

This file was deleted.

2 changes: 1 addition & 1 deletion observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"go.strv.io/background/task"
)

// Observer implements a set of methods that are called when certain events happen with the tasks that you schedule.
// Observer implements a set of methods that are called when certain events happen with the tasks that you execute.
type Observer interface {
// OnTaskAdded is called immediately after scheduling the Task for execution.
OnTaskAdded(ctx context.Context, definition task.Task)
Expand Down
12 changes: 6 additions & 6 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ go get go.strv.io/background

## Usage

There are two types of tasks you can schedule:
There are two types of tasks you can execute:

- one-off: a task that runs once and then finishes
- looping: a task that runs repeatedly until it is stopped
Expand Down Expand Up @@ -73,7 +73,7 @@ maanger := background.NewManagerWithOptions(background.Options{
Observer: observer.Slog{},
})

// Schedule a one-off task - the task will run only once (except if it fails and has a retry policy)
// Executes a one-off task - the task will run only once (except if it fails and has a retry policy)
oneoff := task.Task{
Type: task.TypeOneOff,
Fn: func(ctx context.Context) error {
Expand All @@ -88,7 +88,7 @@ oneoff := task.Task{

manager.RunTask(context.Background(), oneoff)

// Schedule a looping task - the task will run repeatedly until it is stopped
// Execute a looping task - the task will run repeatedly until it is stopped
looping := task.Task{
Type: task.TypeLoop,
Fn: func(ctx context.Context) error {
Expand All @@ -101,16 +101,16 @@ looping := task.Task{
}
}

// Schedule the task to be continuously run in an infinite loop until manager.Close() is called
// Execute the task to be continuously run in an infinite loop until manager.Close() is called
manager.RunTask(context.Background(), looping)
```

## Examples

You can find a sample executable in the [samples](samples) folder. To run them, clone the repository and run:
You can find a sample executable in the [examples](examples) folder. To run them, clone the repository and run:

```sh
go run samples/slog/slog.go
go run examples/slog/slog.go
```

## License
Expand Down
9 changes: 0 additions & 9 deletions samples/slog/readme.md

This file was deleted.

14 changes: 8 additions & 6 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package task

import (
"context"
"errors"

"github.com/kamilsk/retry/v5/strategy"
)
Expand All @@ -18,11 +17,6 @@ const (
TypeLoop
)

var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
)

// Task describes how a unit of work (a function) should be executed.
type Task struct {
// Fn is the function to be executed in a goroutine.
Expand All @@ -38,6 +32,14 @@ type Task struct {
Retry Retry
}

// New creates a new task of the specified type and the provided function to be executed.
func New(t Type, fn Fn) Task {
return Task{
Type: t,
Fn: fn,
}
}

// Fn is the function to be executed in a goroutine.
type Fn func(ctx context.Context) error

Expand Down
Loading

0 comments on commit 5b317d0

Please sign in to comment.