Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: updates requested in #15 #18

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 98 additions & 8 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package background

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"go.strv.io/background/observer"
"go.strv.io/background/task"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their
var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
)

// Manager keeps track of running goroutines and provides mechanisms to wait for them to finish or cancel their
// execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track
// of the tasks in the observer.
type Manager struct {
Expand All @@ -27,8 +35,8 @@ type Options struct {
// StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note
// that no effort is made to actually stop or kill the task.
StalledThreshold time.Duration
// Observer allow you to register monitoring functions that are called when something happens with the tasks that you
// schedule. These are useful for logging, monitoring, etc.
// Observer allows you to register monitoring functions that are called when something happens with the tasks that you
// execute. These are useful for logging, monitoring, etc.
Observer observer.Observer
// Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several
// strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
Expand All @@ -55,14 +63,14 @@ func NewManagerWithOptions(options Options) *Manager {
}
}

// Run schedules the provided function to be executed once in a goroutine.
// Run executes the provided function once in a goroutine.
func (m *Manager) Run(ctx context.Context, fn task.Fn) {
definition := task.Task{Fn: fn}
m.RunTask(ctx, definition)
}

// RunTask schedules the provided task to be executed in a goroutine. The task will be executed according to its type.
// By default, the task will be executed only once (TaskTypeOneOff).
// RunTask executes the provided task in a goroutine. The task will be executed according to its type; by default, only
// once (TaskTypeOneOff).
func (m *Manager) RunTask(ctx context.Context, definition task.Task) {
ctx = context.WithoutCancel(ctx)
done := make(chan error, 1)
Expand All @@ -79,11 +87,11 @@ func (m *Manager) RunTask(ctx context.Context, definition task.Task) {
go m.loop(ctx, definition, done)

default:
m.observer.OnTaskFailed(ctx, definition, task.ErrUnknownType)
m.observer.OnTaskFailed(ctx, definition, ErrUnknownType)
}
}

// Wait blocks until all scheduled one-off tasks have finished. Adding more one-off tasks will prolong the wait time.
// Wait blocks until all running one-off tasks have finished. Adding more one-off tasks will prolong the wait time.
func (m *Manager) Wait() {
m.taskmgr.group.Wait()
}
Expand Down Expand Up @@ -162,3 +170,85 @@ func (m *Manager) observe(ctx context.Context, definition task.Task, done <-chan
}
}
}

// MARK: Internal

// taskmgr is used internally for task tracking and synchronization.
type taskmgr struct {
group sync.WaitGroup
count atomic.Int32
}

// start tells the taskmgr that a new task has started.
func (m *taskmgr) start() {
m.group.Add(1)
m.count.Add(1)
}

// finish tells the taskmgr that a task has finished.
func (m *taskmgr) finish() {
m.group.Done()
m.count.Add(-1)
}

// loopmgr is used internally for loop tracking and synchronization and cancellation of the loops.
type loopmgr struct {
group sync.WaitGroup
count atomic.Int32
ctx context.Context
cancelfn context.CancelFunc
}

func mkloopmgr() loopmgr {
ctx, cancelfn := context.WithCancel(context.Background())
return loopmgr{
ctx: ctx,
cancelfn: cancelfn,
}
}

// start tells the loopmgr that a new loop has started.
func (m *loopmgr) start() {
m.group.Add(1)
m.count.Add(1)
}

// cancel tells the loopmgr that a loop has finished.
func (m *loopmgr) finish() {
m.group.Done()
m.count.Add(-1)
}

func (m *loopmgr) cancel() {
m.cancelfn()
m.group.Wait()
}

// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0,
// the channel will never receive any message.
func mktimeout(duration time.Duration) <-chan time.Time {
if duration == 0 {
return make(<-chan time.Time)
}
return time.After(duration)
}

// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides), 1))

if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}

// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}

return result
}
9 changes: 9 additions & 0 deletions examples/slog/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# log/slog observer

This example executable shows how to use the built-in `observer.Slog` to log messages using the default `log/slog` logger. To see it in action, clone the repository and run:

```sh
go run examples/slog/slog.go
```

You can quit the program by pressing `Ctrl+C`.
File renamed without changes.
90 changes: 0 additions & 90 deletions internal.go

This file was deleted.

34 changes: 0 additions & 34 deletions observer/multi.go

This file was deleted.

2 changes: 1 addition & 1 deletion observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"go.strv.io/background/task"
)

// Observer implements a set of methods that are called when certain events happen with the tasks that you schedule.
// Observer implements a set of methods that are called when certain events happen with the tasks that you execute.
type Observer interface {
// OnTaskAdded is called immediately after scheduling the Task for execution.
OnTaskAdded(ctx context.Context, definition task.Task)
Expand Down
12 changes: 6 additions & 6 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ go get go.strv.io/background

## Usage

There are two types of tasks you can schedule:
There are two types of tasks you can execute:

- one-off: a task that runs once and then finishes
- looping: a task that runs repeatedly until it is stopped
Expand Down Expand Up @@ -73,7 +73,7 @@ maanger := background.NewManagerWithOptions(background.Options{
Observer: observer.Slog{},
})

// Schedule a one-off task - the task will run only once (except if it fails and has a retry policy)
// Executes a one-off task - the task will run only once (except if it fails and has a retry policy)
oneoff := task.Task{
Type: task.TypeOneOff,
Fn: func(ctx context.Context) error {
Expand All @@ -88,7 +88,7 @@ oneoff := task.Task{

manager.RunTask(context.Background(), oneoff)

// Schedule a looping task - the task will run repeatedly until it is stopped
// Execute a looping task - the task will run repeatedly until it is stopped
looping := task.Task{
Type: task.TypeLoop,
Fn: func(ctx context.Context) error {
Expand All @@ -101,16 +101,16 @@ looping := task.Task{
}
}

// Schedule the task to be continuously run in an infinite loop until manager.Close() is called
// Execute the task to be continuously run in an infinite loop until manager.Close() is called
manager.RunTask(context.Background(), looping)
```

## Examples

You can find a sample executable in the [samples](samples) folder. To run them, clone the repository and run:
You can find a sample executable in the [examples](examples) folder. To run them, clone the repository and run:

```sh
go run samples/slog/slog.go
go run examples/slog/slog.go
```

## License
Expand Down
9 changes: 0 additions & 9 deletions samples/slog/readme.md

This file was deleted.

14 changes: 8 additions & 6 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package task

import (
"context"
"errors"

"github.com/kamilsk/retry/v5/strategy"
)
Expand All @@ -18,11 +17,6 @@ const (
TypeLoop
)

var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
)

// Task describes how a unit of work (a function) should be executed.
type Task struct {
// Fn is the function to be executed in a goroutine.
Expand All @@ -38,6 +32,14 @@ type Task struct {
Retry Retry
}

// New creates a new task of the specified type and the provided function to be executed.
func New(t Type, fn Fn) Task {
return Task{
Type: t,
Fn: fn,
}
}

// Fn is the function to be executed in a goroutine.
type Fn func(ctx context.Context) error

Expand Down
Loading
Loading