diff --git a/background.go b/background.go index e19e548..5303de6 100644 --- a/background.go +++ b/background.go @@ -2,16 +2,24 @@ package background import ( "context" + "errors" "sync" + "sync/atomic" "time" "go.strv.io/background/observer" "go.strv.io/background/task" "github.com/kamilsk/retry/v5" + "github.com/kamilsk/retry/v5/strategy" ) -// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their +var ( + // ErrUnknownType is returned when the task type is not a valid value of Type. + ErrUnknownType = errors.New("unknown task type") +) + +// Manager keeps track of running goroutines and provides mechanisms to wait for them to finish or cancel their // execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track // of the tasks in the observer. type Manager struct { @@ -27,8 +35,8 @@ type Options struct { // StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note // that no effort is made to actually stop or kill the task. StalledThreshold time.Duration - // Observer allow you to register monitoring functions that are called when something happens with the tasks that you - // schedule. These are useful for logging, monitoring, etc. + // Observer allows you to register monitoring functions that are called when something happens with the tasks that you + // execute. These are useful for logging, monitoring, etc. Observer observer.Observer // Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several // strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package. @@ -55,14 +63,14 @@ func NewManagerWithOptions(options Options) *Manager { } } -// Run schedules the provided function to be executed once in a goroutine. +// Run executes the provided function once in a goroutine. func (m *Manager) Run(ctx context.Context, fn task.Fn) { definition := task.Task{Fn: fn} m.RunTask(ctx, definition) } -// RunTask schedules the provided task to be executed in a goroutine. The task will be executed according to its type. -// By default, the task will be executed only once (TaskTypeOneOff). +// RunTask executes the provided task in a goroutine. The task will be executed according to its type; by default, only +// once (TaskTypeOneOff). func (m *Manager) RunTask(ctx context.Context, definition task.Task) { ctx = context.WithoutCancel(ctx) done := make(chan error, 1) @@ -79,11 +87,11 @@ func (m *Manager) RunTask(ctx context.Context, definition task.Task) { go m.loop(ctx, definition, done) default: - m.observer.OnTaskFailed(ctx, definition, task.ErrUnknownType) + m.observer.OnTaskFailed(ctx, definition, ErrUnknownType) } } -// Wait blocks until all scheduled one-off tasks have finished. Adding more one-off tasks will prolong the wait time. +// Wait blocks until all running one-off tasks have finished. Adding more one-off tasks will prolong the wait time. func (m *Manager) Wait() { m.taskmgr.group.Wait() } @@ -162,3 +170,85 @@ func (m *Manager) observe(ctx context.Context, definition task.Task, done <-chan } } } + +// MARK: Internal + +// taskmgr is used internally for task tracking and synchronization. +type taskmgr struct { + group sync.WaitGroup + count atomic.Int32 +} + +// start tells the taskmgr that a new task has started. +func (m *taskmgr) start() { + m.group.Add(1) + m.count.Add(1) +} + +// finish tells the taskmgr that a task has finished. +func (m *taskmgr) finish() { + m.group.Done() + m.count.Add(-1) +} + +// loopmgr is used internally for loop tracking and synchronization and cancellation of the loops. +type loopmgr struct { + group sync.WaitGroup + count atomic.Int32 + ctx context.Context + cancelfn context.CancelFunc +} + +func mkloopmgr() loopmgr { + ctx, cancelfn := context.WithCancel(context.Background()) + return loopmgr{ + ctx: ctx, + cancelfn: cancelfn, + } +} + +// start tells the loopmgr that a new loop has started. +func (m *loopmgr) start() { + m.group.Add(1) + m.count.Add(1) +} + +// cancel tells the loopmgr that a loop has finished. +func (m *loopmgr) finish() { + m.group.Done() + m.count.Add(-1) +} + +func (m *loopmgr) cancel() { + m.cancelfn() + m.group.Wait() +} + +// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0, +// the channel will never receive any message. +func mktimeout(duration time.Duration) <-chan time.Time { + if duration == 0 { + return make(<-chan time.Time) + } + return time.After(duration) +} + +// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a +// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on +// failure if no strategy is provided. +func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy { + result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides), 1)) + + if len(overrides) > 0 { + result = append(result, overrides...) + } else { + result = append(result, defaults...) + } + + // If no retry strategies are provided we default to a single execution attempt + if len(result) == 0 { + result = append(result, strategy.Limit(1)) + } + + return result +} diff --git a/examples/slog/readme.md b/examples/slog/readme.md new file mode 100644 index 0000000..ad646aa --- /dev/null +++ b/examples/slog/readme.md @@ -0,0 +1,9 @@ +# log/slog observer + +This example executable shows how to use the built-in `observer.Slog` to log messages using the default `log/slog` logger. To see it in action, clone the repository and run: + +```sh +go run examples/slog/slog.go +``` + +You can quit the program by pressing `Ctrl+C`. diff --git a/samples/slog/slog.go b/examples/slog/slog.go similarity index 100% rename from samples/slog/slog.go rename to examples/slog/slog.go diff --git a/internal.go b/internal.go deleted file mode 100644 index d8a1c18..0000000 --- a/internal.go +++ /dev/null @@ -1,90 +0,0 @@ -package background - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/kamilsk/retry/v5/strategy" -) - -// taskmgr is used internally for task tracking and synchronization. -type taskmgr struct { - group sync.WaitGroup - count atomic.Int32 -} - -// start tells the taskmgr that a new task has started. -func (m *taskmgr) start() { - m.group.Add(1) - m.count.Add(1) -} - -// finish tells the taskmgr that a task has finished. -func (m *taskmgr) finish() { - m.group.Done() - m.count.Add(-1) -} - -// loopmgr is used internally for loop tracking and synchronization and cancellation of the loops. -type loopmgr struct { - group sync.WaitGroup - count atomic.Int32 - ctx context.Context - cancelfn context.CancelFunc -} - -func mkloopmgr() loopmgr { - ctx, cancelfn := context.WithCancel(context.Background()) - return loopmgr{ - ctx: ctx, - cancelfn: cancelfn, - } -} - -// start tells the loopmgr that a new loop has started. -func (m *loopmgr) start() { - m.group.Add(1) - m.count.Add(1) -} - -// cancel tells the loopmgr that a loop has finished. -func (m *loopmgr) finish() { - m.group.Done() - m.count.Add(-1) -} - -func (m *loopmgr) cancel() { - m.cancelfn() - m.group.Wait() -} - -// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0, -// the channel will never receive any message. -func mktimeout(duration time.Duration) <-chan time.Time { - if duration == 0 { - return make(<-chan time.Time) - } - return time.After(duration) -} - -// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a -// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on -// failure if no strategy is provided. -func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy { - result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides), 1)) - - if len(overrides) > 0 { - result = append(result, overrides...) - } else { - result = append(result, defaults...) - } - - // If no retry strategies are provided we default to a single execution attempt - if len(result) == 0 { - result = append(result, strategy.Limit(1)) - } - - return result -} diff --git a/observer/multi.go b/observer/multi.go deleted file mode 100644 index cbea164..0000000 --- a/observer/multi.go +++ /dev/null @@ -1,34 +0,0 @@ -package observer - -import ( - "context" - - "go.strv.io/background/task" -) - -// Multi is an implementation of the Observer interface that calls multiple observers serially. -type Multi []Observer - -func (m Multi) OnTaskAdded(ctx context.Context, definition task.Task) { - for _, o := range m { - o.OnTaskAdded(ctx, definition) - } -} - -func (m Multi) OnTaskSucceeded(ctx context.Context, definition task.Task) { - for _, o := range m { - o.OnTaskSucceeded(ctx, definition) - } -} - -func (m Multi) OnTaskFailed(ctx context.Context, definition task.Task, err error) { - for _, o := range m { - o.OnTaskFailed(ctx, definition, err) - } -} - -func (m Multi) OnTaskStalled(ctx context.Context, definition task.Task) { - for _, o := range m { - o.OnTaskStalled(ctx, definition) - } -} diff --git a/observer/observer.go b/observer/observer.go index 1ae12d4..564cdc8 100644 --- a/observer/observer.go +++ b/observer/observer.go @@ -6,7 +6,7 @@ import ( "go.strv.io/background/task" ) -// Observer implements a set of methods that are called when certain events happen with the tasks that you schedule. +// Observer implements a set of methods that are called when certain events happen with the tasks that you execute. type Observer interface { // OnTaskAdded is called immediately after scheduling the Task for execution. OnTaskAdded(ctx context.Context, definition task.Task) diff --git a/readme.md b/readme.md index 89d30aa..616b1bc 100644 --- a/readme.md +++ b/readme.md @@ -20,7 +20,7 @@ go get go.strv.io/background ## Usage -There are two types of tasks you can schedule: +There are two types of tasks you can execute: - one-off: a task that runs once and then finishes - looping: a task that runs repeatedly until it is stopped @@ -73,7 +73,7 @@ maanger := background.NewManagerWithOptions(background.Options{ Observer: observer.Slog{}, }) -// Schedule a one-off task - the task will run only once (except if it fails and has a retry policy) +// Executes a one-off task - the task will run only once (except if it fails and has a retry policy) oneoff := task.Task{ Type: task.TypeOneOff, Fn: func(ctx context.Context) error { @@ -88,7 +88,7 @@ oneoff := task.Task{ manager.RunTask(context.Background(), oneoff) -// Schedule a looping task - the task will run repeatedly until it is stopped +// Execute a looping task - the task will run repeatedly until it is stopped looping := task.Task{ Type: task.TypeLoop, Fn: func(ctx context.Context) error { @@ -101,16 +101,16 @@ looping := task.Task{ } } -// Schedule the task to be continuously run in an infinite loop until manager.Close() is called +// Execute the task to be continuously run in an infinite loop until manager.Close() is called manager.RunTask(context.Background(), looping) ``` ## Examples -You can find a sample executable in the [samples](samples) folder. To run them, clone the repository and run: +You can find a sample executable in the [examples](examples) folder. To run them, clone the repository and run: ```sh -go run samples/slog/slog.go +go run examples/slog/slog.go ``` ## License diff --git a/samples/slog/readme.md b/samples/slog/readme.md deleted file mode 100644 index b31a9de..0000000 --- a/samples/slog/readme.md +++ /dev/null @@ -1,9 +0,0 @@ -# log/slog observer - -This sample executable shows how to use the built-in `observer.Slog` to log messages using the default `log/slog` logger. To see it in action, clone the repository and run: - -```sh -go run samples/slog/slog.go -``` - -You can quit the program by pressing `Ctrl+C`. diff --git a/task/task.go b/task/task.go index fd2cbd5..3b7b618 100644 --- a/task/task.go +++ b/task/task.go @@ -2,7 +2,6 @@ package task import ( "context" - "errors" "github.com/kamilsk/retry/v5/strategy" ) @@ -18,11 +17,6 @@ const ( TypeLoop ) -var ( - // ErrUnknownType is returned when the task type is not a valid value of Type. - ErrUnknownType = errors.New("unknown task type") -) - // Task describes how a unit of work (a function) should be executed. type Task struct { // Fn is the function to be executed in a goroutine. @@ -38,6 +32,14 @@ type Task struct { Retry Retry } +// New creates a new task of the specified type and the provided function to be executed. +func New(t Type, fn Fn) Task { + return Task{ + Type: t, + Fn: fn, + } +} + // Fn is the function to be executed in a goroutine. type Fn func(ctx context.Context) error diff --git a/task/task_test.go b/task/task_test.go new file mode 100644 index 0000000..2d39414 --- /dev/null +++ b/task/task_test.go @@ -0,0 +1,26 @@ +package task_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.strv.io/background/task" +) + +func Test_New(t *testing.T) { + called := false + def := task.New(task.TypeOneOff, func(ctx context.Context) error { + called = true + return nil + }) + + err := def.Fn(context.Background()) + + assert.NoError(t, err) + assert.True(t, called) + assert.Equal(t, task.TypeOneOff, def.Type) + + assert.Empty(t, def.Meta) + assert.Empty(t, def.Retry) +}